[Python-es] ¿queue.put bloquea?

Chema Cortes pych3m4 en gmail.com
Mar Ene 21 16:20:32 CET 2014


El día 20 de enero de 2014, 15:23, Daπid <davidmenhur en gmail.com> escribió:
>
> 2014/1/18 Chema Cortes <py en ch3m4.org>
>>
>> Este relato es habitual en la programación concurrente.
>>
>> Queue se utiliza para "sincronismo" entre workers. El sincronismo
>> requiere establecer bloqueos, que aumentan en número geométricamente
>> al aumentar el número de workers, haciendo el programa mucho más lento.
>>
>> Si no necesitas "sincronismo", o sea, si no necesitas que los datos se
>> encolen en el mismo orden que se generan (el orden lo podrías
>> recomponer después usando las marcas temporales), usa mejor
>> collections.deque cuyas operaciones son "atómicas" (no bloqueantes).
>> Ojo con lo que puede llegar a crecer si no eres capaz de sacar datos
>> al mismo ritmo que se meten.
>
>
>
> El orden en el que se ejecuten es completamente irrelevante. deque parece
> una opción muy buena, pero, desafortunadamente, no parece ser compatible con
> multiprocessing.

Tienes razón, con multiproceso funciona de otro modo.

La cuestión sería emplear los métodos async, algo así:

import time
from random import random

from collections import deque
from multiprocessing import Pool, freeze_support
from functools import partial

from pprint import pprint

def do_insert(i):
    time.sleep(random()/10)
    return (i, i*i, -i*100)

def do_callback(q, res):
    q.append(res)


if __name__ == "__main__":
    freeze_support()
    q = deque()

    mp = Pool() # núm. se autoajusta según número de cores

    callback = partial(do_callback,q)
    results = [ mp.apply_async(do_insert, (i,), callback=callback)
                    for i in range(100)    ]

    while not all(res.ready() for res in results):
        print ".",
        time.sleep(0.1)

    pprint(q)


En do_insert se haría el procesado de cada dato (he introducido un
retardo aleatorio para que comprobar que el orden final no depende del
orden en que se introducen los datos).

En do_callback se iría añadiendo los datos a la cola deque. Paso la
cola como argumento implícito en una función parcial. También se
podría haber usado como variable global, pero se debe evitar en
multiproceso.

El "cuello de botella" se produce al introducir los datos en la cola.
Si es necesario, se pueden incrementar el número de workers en el
pool. No hay lecturas de la cola, por lo que no se producen bloqueos
de lectura.


>
> Aquí está el esqueleto de mi programa:
>
> https://gist.github.com/Dapid/8520567
>
> Si uso deque, el proceso siempre ve la misma cola vacía. Si lo convierto en
> un Thread, puede leer los datos del programa principal, pero no  los datos
> generados desde un multiprocessing.pool.
>
> ¿Hay alguna otra alternativa?

Depende de para qué, podrías mirarte alguna implementación de
"mapreduce" a ver si te sirve para esta problema:

De más sencillo a más potente:

Octopy: http://code.google.com/p/octopy/
Mincemeat: https://github.com/michaelfairley/mincemeatpy
MrJob: http://pythonhosted.org/mrjob
Spark: http://spark.incubator.apache.org/

-- 
Hyperreals *R  "Quarks, bits y otras criaturas infinitesimales":
http://ch3m4.org/blog
Buscador Python Hispano: http://ch3m4.org/python-es


Más información sobre la lista de distribución Python-es