[Tutor] looping generator
richard kappler
richkappler at gmail.com
Thu Jan 7 13:29:05 EST 2016
Martin, the suggestion you provide was the first I thought of days ago when
we first started this project, however the constraints preclude us writing
to file. I am reading everything your wrote though, and looking through the
links you provided. Your assistance is sincerely appreciated.
regards, Richard
On Thu, Jan 7, 2016 at 1:15 PM, Martin A. Brown <martin at linux-ip.net> wrote:
>
> Hi there Richard,
>
> >I have a stream of incoming xml data. I can receive the data, parse
> >the data, etc, so long as I don't get fancy and I have a miniscule
> >delay in between each message. If I get rid of the time delay,
> >which I need to, I need the script to continuously process the
> >incoming messages. Here's what I have:
>
> To begin, I have a suggestion that is not specifically a Python
> suggestion. I have read several of your prior emails describing
> your problem.
>
> If I were faced with the problem of receiving and processing data
> from remote "dumb" nodes, I would separate the software components
> into, at least, two distinct pieces.
>
> 1. Trust the filesystem. Write one software component that
> receives the data from the wire and writes it out to a
> configurable directory. If, for whatever reason, you lose the
> data, the performance of the rest of the system does not
> matter. Thus, capturing the data is the most important first
> step. Use your system's daemonization tools to run this
> service.
>
> 2. Improve performance of the parsing and processing tools.
> Teach the tools how to read the data stored in the filesystem
> and iteratively locate hot spots, performance issues, parsing
> problems or data shortcomings.
>
> Here are a few disorganized thoughts about why and how to do it this
> way:
>
> * the network listener becomes much simpler since it will not
> parse, and will only write out to disk
>
> * let's assume each XML chunk is about 128k and you have 30 data
> sources and are receiving 4 chunks per second from each; total
> data volume is 1.5MiB, easily able to be received and written to
> disk on modern hardware
>
> * you could segregate the XML chunks also by data source (and
> maybe also time), writing out each chunk into the filesytem; if
> you break each message into its own file, that would be a large
> number of files (with attendant open() and close() costs), so
> perhaps writing out a new file every minute or fifteen minutes;
> here's a possible file naming scheme
>
> received/2016/01/0000-10.143.17.227.data
> received/2016/01/0100-10.143.17.227.data
> received/2016/01/0200-10.143.17.227.data
> ...
> received/2016/01/2300-10.143.17.227.data
>
> that would leave you with about 720 files per daily directory,
> something that is eminently manageable for modern filesystems
> (and for any pesky humans who happen to be wandering around)
>
> * if you write out the stream of data to the filesystem, your
> network listener need only locate the \x02 byte and the \x03
> byte--it could ensure that every file it wrote contained a first
> byte of \x02 and a final byte of \x03
>
> * you can independently upgrade the parsing and processing tools
> and the data recording service
>
> * if you retain these files, you can "replay" the past (errors,
> bursts, reprocessing); alternatively simply delete the files
> after they are processed for downstream consumers
>
> * separating the responsibilities of each software component also
> simplifies your diagnosis and software authorship process; first
> you can make sure that you are recording the data properly; once
> that is done, you can start to process your data, moving along
> to performance questions next
>
> Now, below, I have a few Python-specific points or questions:
>
> >#!/usr/bin/env python
> >
> >import socket
> >import lxml.etree as ET
> >
> >def dataRecv(connection):
> > print 'receiving'
> > while True:
> > data = connection.recv(65536)
> > while True:
> > print "writing to data.in"
> > f2.write(data)
> > start = data.find('\x02')
> > end = data.find('\x03')
> > message = data[start+1:end]
> > print "writing to messages.out"
> > f3.write(message)
> > yield message
>
> You do not define f2 and f3 until below. If you are going to do
> this, pass them into the function f2 and f3. I.e.
>
> def dataRecv(connection, f2, f3):
> ....
>
> while True:
> # wait for a connection
> connection, client_address = sock.accept()
> q = dataRecv(connection, f2, f3)
>
>
> >def dataParse(message):
> > print 'parsing'
> > xslt = ET.parse('stack13.xsl')
> > dom = ET.XML(message)
> > transform = ET.XSLT(xslt)
> > newdom = transform(dom)
> > f1.write(str(newdom))
> >
> >
> >sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> >sock_addr = ('', 2008)
> >#data = sock.makefile('r')
> >sock.bind(sock_addr)
> >sock.listen(5)
> >print 'listening'
> >
> >f1 = open('parser.out', 'a')
> >print "opening parser.out"
> >f2 = open('data.in', 'a')
> >print "opening data.in"
> >f3 = open('messages.out', 'a')
> >print "opening messages.out"
> >
> >while True:
> > # wait for a connection
> > connection, client_address = sock.accept()
> > q = dataRecv(connection)
> > dataParse(q.next())
> >
> ># close sockrx
> >#connection.close()
> >
> >f1.close()
> >
> >
>
> By the way, keep on breaking these things into functions! This is
> the way to go.
>
> >In the dataRecv function, I have tried (where you see while True)
> >if data, while data and while True. Regardless, it doesn't loop, it
> >receives al ten messages from the test file being sent, but only
> >processes the first message then stops (not exits). I feel like I'm
> >missing something obvious but can't find it.
>
> The problem starts in your dataRecv function:
>
> def dataRecv(connection):
> print 'receiving'
> while True:
> data = connection.recv(65536) # -- A: receive all data
> while True:
> print "writing to data.in"
> f2.write(data)
> start = data.find('\x02')
> end = data.find('\x03')
> message = data[start+1:end]
> print "writing to messages.out"
> f3.write(message)
> yield message # -- B: yield one message
>
> while True:
> # wait for a connection
> connection, client_address = sock.accept() # -- D: wait on network
> q = dataRecv(connection)
> dataParse(q.next()) # -- C: process one message
>
> Consider what happens when you read 64k bytes into the variable
> called 'data'.
>
> A: This probably reads all of the data at once (all ten messages).
> You then locate the first \x02 and then the following \x03. (N.B.
> You are also assuming that they will occur in that order in your
> data; they might not.)
> B: Then you are happy you have identified the first message.
> You yield it, which is now handled by the dataParse function.
> C: Now, you take that message and parse it.
> D: And, we go back to sock.accept(), leaving all of that unprocessed data
> in the variable 'data' in the dataRecv function.
>
> Specifically, your problem is about breaking the data apart and
> using it all. You might benefit from studying techniques for
> breaking a text apart by paragraph. Think about how this applies to
> your problem:
>
>
> http://code.activestate.com/recipes/66063-read-a-text-file-by-paragraph/#c1
>
> N.B. The code example may not be utterly perfect, but it is
> precisely the same problem that you are having.
>
> Good luck and enjoy,
>
> -Martin
>
> --
> Martin A. Brown
> http://linux-ip.net/
>
--
All internal models of the world are approximate. ~ Sebastian Thrun
More information about the Tutor
mailing list