[Python-checkins] python/nondist/sandbox/collections pairing_heap.py, NONE, 1.1 pairing_heap_test.py, NONE, 1.1

rhettinger at users.sourceforge.net rhettinger at users.sourceforge.net
Fri Apr 2 02:46:05 EST 2004


Update of /cvsroot/python/python/nondist/sandbox/collections
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21444

Added Files:
	pairing_heap.py pairing_heap_test.py 
Log Message:
Add Dan Stutzbach's first pass at paired heaps
(after modifying his test suite to use unittests).

The pure python version is being used to work out API issues, develop
thorough tests, and to tease out implementation issues.

Ultimately, this will be added as collections.heap



--- NEW FILE: pairing_heap.py ---
#!/usr/bin/python

"""

Note: this implementation is not currently thread-safe.

The code in this file implements a pairing heap.  By default it
operates as a min-heap, but it can also work as a max-heap via a flag
passed to the heap's __init__ routing.

The important papers on pairing heaps are these:

[1] Michael L. Fredman, Robert Sedjewick, Daniel D. Sleator, and
Robert E. Tarjan.  The Pairing Heap: A New Form of Self-Adjusting
Heap.  Algorithmica 1:111-129, 1986.

[2] John T. Stasko and Jeffrey Scott Vitter.  Pairing heaps:
experiments and analysis.  Communications of the ACM, Volume 30, Issue
3, Pages 234-249, 1987.

[3] Michael L. Fredman.  On the efficiency of pairing heaps and
related data structures.  Journal of the ACM, Volume 46, Issue 4,
Pages 473-501, 1999.

There are actually several varieties of pairing heaps.  Five are
described in [1], and two more are described in [2].  This
implementation is the basic pairing heaps described in [1], also known
as the twopass variant.  There isn't much published evidence that any
of the variants are significantly better than the others.

The twopass pairing heap works as follows.

A pairing heap is a tree that maintains the heap property: each node
has a lower value than all its children.  The pairing heap tree need
not be binary nor balanced.  The good amortized efficiency of the
pairing heap results from clever algorithms for .adjust_key() and
.delete().

The basic operation in the pairing heap is the comparison-link.  Two
nodes are compared, and the largest is linked as a child of the
smaller.  To represent the tree, each node maintains a pointer to the
leftmost child and to its right sibling.  Each node also maintains a
pointer to its parent.  When a comparison-link is performed, the new
child becomes the leftmost child.

[1] suggests a memory optimization where only the right-most sibling
stores a pointer to the parent, and each node maintains a single bit
indicating if it is that rightmost sibling.  This theoretically saves
the size of a pointer in each node, but only if we can store a bit for
free.  However, storing a bit takes just as much space as storing a
pointer, due to padding.  So, we bite the bullet and store the
pointer.  Also, [3] shows that the amortized cost of .adjust_key()
is at least Omega(log log n) if the parent pointer is *not* stored.
The amortized cost may be better using the parent pointer.

For .adjust_key(), the topmost element is removed and set aside as
the return value.  A left-to-right pass is made over its children,
comparison-linking them pairwise.  For example, if there are 6
children labeled A, B, C, D, E, and F, A and B will be
comparison-linked, as will C and D, and E and F.  Next, the rightmost
node is repeatedly comparison-linked with its sibling until there is
only one node left.  This node is the new root.

For .delete(), first the node is cut from its parent.  This results in
a tree rooted at the node being deleted.  The .adjust_key()
procedure is invoked on the node, then the resulting tree is
comparison-linked with the original tree.

See [1] for more details.  It also includes figures which are helpful
for understanding.

"""

class WrongHeap (Exception):
    """The user attempted to .delete() or .adjust() from the wrong heap.

    This can also occur if the user attempts to delete or adjust a
    node that has already been removed from the heap.
    """
    def __str__(self):
        return 'WrongHeap'

class Underflow (Exception):
    "The user attempted to .peek() or extract() from an empty heap."
    def __str__(self):
        return 'Underflow'

class IncompatibleHeaps (Exception):
    "The user attempted to meld heaps that have different compare operations."
    def __str__(self):
        return 'IncompatibleHeaps'

class InternalError (Exception):
    "This should never get raised in practice."
    def __str__(self):
        return 'InternalError'

class WrongAdjustKeyDirection (Exception):
    "The user attempted to adjust a key in the wrong direction."
    def __str__(self):
        return 'WrongAdjustKeyDirection'

class heap_node:
    """This class is used internally to implement the heap.

    Users will have be able to see references to this data structure,
    as this is neccessary so they can pass the reference to the
    .adjust_key() and .delete() methods.  However, the data structure
    should be considered opaque.  The only public methods are .value()
    and .values().

    Users should not instantiate heap_node on their own.

    In the C implementation, all of this will be enforced.

    """
    __slots__ = ('_item', '_child', '_sibling', '_parents')

    def __init__(self, item):
        self._item = item
        self._clean()

    def value(self):
        "Return the value associated with this node."
        return self._item

    def _clean(self):
        self._child = None
        self._sibling = None
        self._parent = None

    def _add_child(self, node):
        node._parent = self
        node._sibling = self._child
        self._child = node

    def _cut(self):
        "Remove this node and its children from this node's parent."
        n = self._parent
        if id(n._child) == id(self):
            n._child = self._sibling
        else:
            n = n._child
            try:
                while id(n._sibling) != id(self):
                    n = n._sibling
                n._sibling = self._sibling
            except AttributeError:
                raise InternalError
        self._parent = None
        self._sibling = None

    def _extract(self, heap):
        "Link this node's children as a prelude to this node's removal"

        #  Make the left-to-right pairing pass
        children = []
        n = self._child
        while n and n._sibling:
            next = n._sibling._sibling
            pair = n._sibling
            n._parent = None
            n._sibling = None
            pair._parent = None
            pair._sibling = None
            children.append(heap._link(n, pair))
            n = next
        if n:
            children.append(n)
        if not children:
            return None

        # Repeated comparison-link to the rightmost node
        root = children[-1]
        for i in range(len(children)-2,-1,-1):
            root = heap._link(root, children[i])
        root._parent = None

        # Return the new root
        return root

    def values(self):
        "Return an unsorted sequence of the values of this part of the tree."
        rv = [self._item,]
        if self._child:
            rv += self._child.values()
        if self._sibling:
            rv += self._sibling.values()
        return rv

class pairing_heap:
    "Implements a min-heap using the pairing-heap algorithm."

    def __init__(self, values=None, cmpfunc=None, key=None, reverse=False):
        self._cmpfunc = cmpfunc
        self._key = key
        self._reverse = reverse
        self._root = None
        self._len = 0
        if values is not None:
            for i in values:
                self.insert(i)

    def empty(self):
        "True if there is nothing in the heap."
        return self._root is None

    def peek(self):
        "Return, but do not remove, the top of the heap."
        if self._root:
            return self._root._item
        else:
            raise Underflow

    def __len__(self):
        "Return the number of items in the heap."
        return self._len

    def _cmp(self, item1, item2):
        "Compare two items using the heap's comparison functions."
        if self._key:
            item1 = getattr(item1, self._key)
            item2 = getattr(item2, self._key)

        if self._cmpfunc:
            c = self._cmpfunc(item1, item2)
        else:
            c = cmp(item1, item2)

        if self._reverse:
            c = -c

        return c

    def _link(self, node1, node2):
        "Perform the comparison-link operation."
        if not node1: return node2
        if not node2: return node1

        c = self._cmp(node1._item, node2._item)

        if c > 0:
            node2._add_child (node1)
            return node2
        else:
            node1._add_child (node2)
            return node1

    def insert(self, item):
        "Insert an item into the heap."
        node = heap_node(item)
        self._root = self._link(node, self._root)
        self._len += 1
        return node

    def meld(self, other):
        """Merge another heap into this one.

        other will be an empty heap after completion
        """

        if not isinstance(other, pairing_heap):
            raise TypeError

        # Check that these heaps are compatible
        if self._cmpfunc != other._cmpfunc or self._reverse != other._reverse \
           or self._key != other._key:
            raise IncompatibleHeaps

        self._root = self._link(other._root, self._root)
        self._len += other._len

        # Destroy other
        other._len = 0
        other._root = None

    def adjust_key(self, node, item):
        """Adjust the value of node to item.

        This must be decrease for a min-heap or an increase for a max-heap.

        If node is not part of this heap, both heaps may become gibberish.
        """
        self._check_heap_node (node)
        if self._cmp(node._item, item) < 0:
            raise WrongAdjustKeyDirection
        node._item = item
        if id(self._root) == id(node):
            return

        # Cut this node out of the tree
        node._cut()

        # Link it with the root
        self._root = self._link(node, self._root)

    def delete(self, node):
        """Delete a node from the middle of the heap.

        If node is not part of this heap, both heaps may become gibberish.
        """
        self._check_heap_node (node)
        if id(self._root) == id(node):
            self.extract()
            return
        node._cut()
        self._root = self._link (node._extract(self), self._root)
        self._len -= 1
        node._clean()
        return node._item

    def extract(self, n=0):
        "Extract the top value of the heap."
        if n > 0:
            return [self.extract() for i in xrange(n)]
        if not self._root:
            raise Underflow
        old_root = self._root
        self._root = self._root._extract(self)
        self._len -= 1
        old_root._clean()
        return old_root._item

    def extract_all(self):
        "Empty the heap into a sorted list of all the values"
        return [self.extract() for i in xrange(self._len)]

    def values(self):
        "Return an unsorted list of all the values in the heap"
        return self._root.values()

    def _check_heap_node(self, node):
        """Raise an error if node will cause problems.

        This function is a simple check to catch certain user errors.
        It will raise TypeError if node isn't a heap_node object at
        all.  It *may* raise WrongHeap if node isn't part of this heap.
        It will return if node is part of this heap.

        However, no guarantees are made about the return value if node
        is a heap_node, but part of a different heap.

        """

        if not isinstance(node, heap_node):
            raise TypeError
        if self._root == node:
            return
        if not node._parent:
            raise WrongHeap
        return

--- NEW FILE: pairing_heap_test.py ---
#!/usr/bin/python
from pairing_heap import *
import random
import unittest

datasize = 256

class Holder:
    def __init__(self, item):
        self.xyzzy = item

def check_invariant(heap, node=None):
    if node == None:
        node = heap._root
    if node == None:
        return True
    n2 = node._child
    while n2:
        if heap._cmp(node._item, n2._item) >= 0:
            return False
        check_invariant(heap, n2)
        n2 = n2._sibling
    return True

# Uncomment to speed things up when doing code-coverage analysis
##def check_invariant(heap, node=None):
##    return True

class TestPairingHeap(unittest.TestCase):

    def setUp(self):
        self.data = [random.random() for i in xrange(datasize)]
        self.data_sorted = sorted(self.data)

    def test_random(self):
        # Push random numbers and pop them off, verifying all's OK.
        heap = pairing_heap()
        self.assert_(check_invariant(heap))
        for item in self.data:
            heap.insert(item)
            self.assert_(check_invariant(heap))
        results = []
        while not heap.empty():
            item = heap.extract()
            self.assert_(check_invariant(heap))
            results.append(item)
        self.assertEqual(self.data_sorted, results)

    def test_naive_nbest(self):
        heap = pairing_heap()
        for item in self.data:
            heap.insert(item)
            if len(heap) > 10:
                heap.extract()
        result = heap.extract_all()
        self.assertEqual(result, self.data_sorted[-10:])

    def test_underflow(self):
        heap = pairing_heap()
        self.assertRaises(Underflow, heap.extract)
        self.assertRaises(Underflow, heap.peek)

    def test_return_unsorted(self):
        heap = pairing_heap()
        for item in self.data:
            heap.insert(item)
        self.assertEqual(set(heap.values()), set(self.data))

    def test_implicit_creation_and_peek(self):
        heap = pairing_heap(self.data)
        self.assert_(check_invariant(heap))
        self.assertEqual(heap.peek(), self.data_sorted[0])
        self.assert_(check_invariant(heap))
        result = heap.extract_all()
        self.assertEqual(result, self.data_sorted)

    def test_extract_with_arg(self):
        heap = pairing_heap(self.data)
        low10 = heap.extract(10)
        self.assert_(check_invariant(heap))
        self.assertEqual(low10, self.data_sorted[:10])

    def test_deleting_from_the_middle(self):
        heap = pairing_heap()
        nodes = [heap.insert(item) for item in self.data]
        data2 = self.data[:]
        count = 0
        for i in xrange(datasize-1,-1,-5):
            heap.delete(nodes[i])
            del nodes[i]
            self.assert_(check_invariant(heap))
            count += 1
            self.assertEqual(datasize-count, len(heap))
            del data2[i]
        result = heap.extract_all()
        data2_sorted = data2[:]
        data2_sorted.sort()
        self.assertEqual(data2_sorted, result)

    def test_deleting_everything_in_random_order(self):
        heap = pairing_heap()
        nodes = [heap.insert(item) for item in self.data]
        for i in xrange(datasize):
            t = random.randrange(len(heap))
            heap.delete(nodes[t])
            del nodes[t]
            self.assert_(check_invariant(heap))
        self.assert_(heap.empty())

    def test_adjust_key(self):
        heap = pairing_heap()
        nodes = [heap.insert(item) for item in self.data]
        data2 = []
        for i in xrange(datasize):
            data2.append(min(self.data[i], random.random()))
        idx = range(datasize)
        random.shuffle(idx)
        for i in idx:
            heap.adjust_key(nodes[i], data2[i])
            self.assert_(check_invariant(heap))
        self.assertEqual(sorted(data2), heap.extract_all())

    def test_build_tree_by_melding(self):
        heap = pairing_heap()
        i = 0
        while i < datasize:
            j = max(random.randrange(min(datasize - i, 5)), 1)
            heap2 = pairing_heap(self.data[i:i+j])
            i += j
            heap.meld(heap2)
            self.assert_(check_invariant(heap))
        self.assertEqual(self.data_sorted, heap.extract_all())

    def test_cmpfunc(self):
        reverse = lambda x, y: -cmp(x,y)
        heap = pairing_heap(self.data, cmpfunc=reverse)
        self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())

    def test_reverse(self):
        heap = pairing_heap(self.data, reverse=True)
        self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())

    def test_cmpfunc_with_reverse(self):
        reverse = lambda x, y: -cmp(x,y)
        heap = pairing_heap(self.data, cmpfunc=reverse, reverse=True)
        self.assertEqual(self.data_sorted, heap.extract_all())

    def test_key(self):
        holders = [Holder(item) for item in self.data]
        heap = pairing_heap(holders, key='xyzzy')
        results = heap.extract_all()
        self.assertEqual(self.data_sorted,[item.xyzzy for item in results])

    def test_extract_values_from_nodes(self):
        heap = pairing_heap()
        nodes = [heap.insert(datum) for datum in self.data]
        for i in xrange(len(nodes)):
            self.assertEqual(nodes[i].value(), self.data[i])
        self.assert_(check_invariant(heap))

    def test_meld_with_incompatible_heap(self):
        heap1 = pairing_heap()
        reverse = lambda x, y: -cmp(x,y)
        self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(cmpfunc = reverse))
        self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(reverse = True))
        self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(key = 'xyzzy'))


    def test_adj_key_wrong_direction(self):
        heap = pairing_heap ()
        nodes = [heap.insert(datum) for datum in range(5)]
        self.assertRaises(WrongAdjustKeyDirection, heap.adjust_key, nodes[4], 100)

    def test_adjust_key_on_existing_root(self):
        heap = pairing_heap ()
        data = self.data
        nodes = [heap.insert(datum) for datum in data]
        low = 0
        for i in xrange(1, len(data)):
            if data[i] == data[low]:
                low_list.append (i)
            if data[i] < data[low]:
                low = i
                low_list = [i]
        for i in low_list:
            heap.adjust_key(nodes[i], data[i] - random.random())

    def test_bogus_node_parameter(self):
        heap = pairing_heap ()
        nodes = [heap.insert(datum) for datum in self.data]
        self.assertRaises(TypeError, heap.adjust_key, "muhahahah!", 5)
        self.assert_(check_invariant(heap))
        self.assertRaises(TypeError, heap.delete, "muhahahah!")


    def test_semibogus_node_parameter(self):
        holders = [Holder(item) for item in self.data]
        heap = pairing_heap(key='xyzzy')
        nodes = [heap.insert(datum) for datum in holders]
        for i in xrange(len(holders)):
            holders[i].node = nodes[i]
        top = heap.extract ()
        self.assertRaises(WrongHeap, heap.delete, top.node)
        self.assert_(check_invariant(heap))


if __name__ == '__main__':
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestPairingHeap))
    unittest.TextTestRunner(verbosity=2).run(suite)




More information about the Python-checkins mailing list