[Python-checkins] r74320 - in python/branches/tk_and_idle_maintenance/Lib/idlelib/test: __init__.py common.py raw_server.py test_rpc.py
guilherme.polo
python-checkins at python.org
Wed Aug 5 18:26:32 CEST 2009
Author: guilherme.polo
Date: Wed Aug 5 18:26:32 2009
New Revision: 74320
Log:
An attempt to start testing rpc, nothing interesting to see yet.
Added:
python/branches/tk_and_idle_maintenance/Lib/idlelib/test/
python/branches/tk_and_idle_maintenance/Lib/idlelib/test/__init__.py (contents, props changed)
python/branches/tk_and_idle_maintenance/Lib/idlelib/test/common.py (contents, props changed)
python/branches/tk_and_idle_maintenance/Lib/idlelib/test/raw_server.py (contents, props changed)
python/branches/tk_and_idle_maintenance/Lib/idlelib/test/test_rpc.py (contents, props changed)
Added: python/branches/tk_and_idle_maintenance/Lib/idlelib/test/__init__.py
==============================================================================
Added: python/branches/tk_and_idle_maintenance/Lib/idlelib/test/common.py
==============================================================================
--- (empty file)
+++ python/branches/tk_and_idle_maintenance/Lib/idlelib/test/common.py Wed Aug 5 18:26:32 2009
@@ -0,0 +1 @@
+RPC_DEBUGGING = False
Added: python/branches/tk_and_idle_maintenance/Lib/idlelib/test/raw_server.py
==============================================================================
--- (empty file)
+++ python/branches/tk_and_idle_maintenance/Lib/idlelib/test/raw_server.py Wed Aug 5 18:26:32 2009
@@ -0,0 +1,11 @@
+import sys
+import socket
+from idlelib import rpc
+from idlelib.test import common
+
+def idle_server(host, port):
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.connect((host, port))
+ sockio = rpc.SocketIO(server, debugging=True)#common.RPC_DEBUGGING)
+ sockio.location = 'Server'
+ sockio.mainloop()
Added: python/branches/tk_and_idle_maintenance/Lib/idlelib/test/test_rpc.py
==============================================================================
--- (empty file)
+++ python/branches/tk_and_idle_maintenance/Lib/idlelib/test/test_rpc.py Wed Aug 5 18:26:32 2009
@@ -0,0 +1,77 @@
+import sys
+import time
+import socket
+import unittest
+import subprocess
+from idlelib import rpc
+from idlelib.test import common
+
+# XXX this is not testing anything yet
+
+HOST = '127.0.0.1'
+PORT = 0
+
+class SocketIOTest(unittest.TestCase):
+
+ def setUp(self):
+ client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client.bind((HOST, PORT))
+ client.listen(1)
+ port = client.getsockname()[1]
+ self.server = subprocess.Popen([sys.executable, "-c",
+ """x = __import__('idlelib.test.raw_server')
+x.test.raw_server.idle_server(%r, %d)""" % (HOST, port)])
+ sock, _ = client.accept()
+ self.sockio = rpc.SocketIO(sock, debugging=True)#common.RPC_DEBUGGING)
+ self.sockio.location = 'Client'
+ print sock.getsockname()
+
+ def tearDown(self):
+ self.server.kill()
+ self.server.wait()
+ self.sockio.close()
+
+ def test_x(self):
+ long_list = range(rpc.BUFSIZE * 2)
+ self.sockio.putmessage(long_list)
+ self.sockio.putmessage(long_list)
+ self.sockio.putmessage(long_list)
+
+
+class RpcTest(unittest.TestCase):
+
+ def setUp(self):
+ # XXX will probably have to subclass RPCClient since its handle_EOF
+ # calls the exithook method which calls os._exit
+ self.client = client = rpc.RPCClient((HOST, PORT))
+ client.debugging = common.RPC_DEBUGGING
+ listening_sock = client.listening_sock
+ listening_sock.settimeout(10)
+ port = str(listening_sock.getsockname()[1])
+ self.server = subprocess.Popen([sys.executable, "-c",
+ "__import__('idlelib.run').run.main()", port])
+ client.accept()
+ client.register("stdout", sys.stdout)
+ client.register("stderr", sys.stderr)
+
+ def tearDown(self):
+ self.server.kill()
+ self.server.wait()
+ self.client.listening_sock.close()
+
+ def test_x(self):
+ client, server = self.client, self.server
+
+ active_seq = client.asyncqueue('exec', 'runcode', ('print "hi"', ), {})
+ while True:
+ response = client.pollresponse(active_seq, wait=0.05)
+ if not response:
+ time.sleep(0.1)
+ else:
+ break
+ how, what = response
+ print active_seq, how, what, "<<"
+
+
+if __name__ == "__main__":
+ unittest.main()
More information about the Python-checkins
mailing list