[Tracker-discuss] [issue581] Add rest endpoint to roundup

Maciej Szulik metatracker at psf.upfronthosting.co.za
Mon Feb 29 17:14:53 EST 2016


Maciej Szulik added the comment:

I was too fast with my previous judgment, thankfully. It's working ok. I've additionally fixed the message returned upon auth error. I think we can consider this done atm. Ezio ptal.

_______________________________________________________
PSF Meta Tracker <metatracker at psf.upfronthosting.co.za>
<http://psf.upfronthosting.co.za/roundup/meta/issue581>
_______________________________________________________
-------------- next part --------------
diff --git a/roundup/cgi/client.py b/roundup/cgi/client.py
--- a/roundup/cgi/client.py
+++ b/roundup/cgi/client.py
@@ -22,6 +22,7 @@
 from roundup.mailer import Mailer, MessageSendError, encode_quopri
 from roundup.cgi import accept_language
 from roundup import xmlrpc
+from roundup import rest
 
 from roundup.anypy.cookie_ import CookieError, BaseCookie, SimpleCookie, \
     get_cookie_date
@@ -378,6 +379,8 @@
         try:
             if self.path == 'xmlrpc':
                 self.handle_xmlrpc()
+            elif self.path == 'rest' or self.path[:5] == 'rest/':
+                self.handle_rest()
             else:
                 self.inner_main()
         finally:
@@ -419,6 +422,33 @@
         self.setHeader("Content-Length", str(len(output)))
         self.write(output)
 
+    def handle_rest(self):
+        # Set the charset and language
+        self.determine_charset()
+        self.determine_language()
+        # Open the database as the correct user.
+        # TODO: add everything to RestfulDispatcher
+        try:
+            self.determine_user()
+        except LoginError, err:
+            self.response_code = http_.client.UNAUTHORIZED
+            self.write({'error': {
+                    'status': http_.client.UNAUTHORIZED,
+                    'msg': 'Unauthorized'
+                }})
+            return
+
+        self.check_anonymous_access()
+
+        # Call rest library to handle the request
+        handler = rest.RestfulInstance(self, self.db)
+        output = handler.dispatch(self.env['REQUEST_METHOD'], self.path,
+                                  self.form)
+
+        # self.setHeader("Content-Type", "text/xml")
+        self.setHeader("Content-Length", str(len(output)))
+        self.write(output)
+
     def add_ok_message(self, msg, escape=True):
         add_message(self._ok_message, msg, escape)
 
@@ -1165,7 +1195,7 @@
         if name is None:
             name = 'home'
 
-        tplname = name     
+        tplname = name
         if view:
             tplname = '%s.%s' % (name, view)
 
diff --git a/roundup/rest.py b/roundup/rest.py
new file mode 100644
--- /dev/null
+++ b/roundup/rest.py
@@ -0,0 +1,551 @@
+"""
+Restful API for Roundup
+
+This module is free software, you may redistribute it
+and/or modify under the same terms as Python.
+"""
+
+import urlparse
+import os
+import json
+import pprint
+import sys
+import time
+import traceback
+import re
+
+from roundup import hyperdb
+from roundup import date
+from roundup import actions
+from roundup.exceptions import *
+from roundup.cgi.exceptions import *
+
+
+def _data_decorator(func):
+    """Wrap the returned data into an object."""
+    def format_object(self, *args, **kwargs):
+        # get the data / error from function
+        try:
+            code, data = func(self, *args, **kwargs)
+        except NotFound, msg:
+            code = 404
+            data = msg
+        except IndexError, msg:
+            code = 404
+            data = msg
+        except Unauthorised, msg:
+            code = 403
+            data = msg
+        except UsageError, msg:
+            code = 400
+            data = msg
+        except (AttributeError, Reject), msg:
+            code = 405
+            data = msg
+        except ValueError, msg:
+            code = 409
+            data = msg
+        except NotImplementedError:
+            code = 402  # nothing to pay, just a mark for debugging purpose
+            data = 'Method under development'
+        except:
+            exc, val, tb = sys.exc_info()
+            code = 400
+            ts = time.ctime()
+            data = '%s: An error occurred. Please check the server log' \
+                   ' for more information.' % ts
+            # out to the logfile
+            print 'EXCEPTION AT', ts
+            traceback.print_exc()
+
+        # decorate it
+        self.client.response_code = code
+        if code >= 400:  # any error require error format
+            result = {
+                'error': {
+                    'status': code,
+                    'msg': data
+                }
+            }
+        else:
+            result = {
+                'data': data
+            }
+        return result
+    return format_object
+
+
+def parse_accept_header(accept):
+    """
+    Parse the Accept header *accept*, returning a list with 3-tuples of
+    [(str(media_type), dict(params), float(q_value)),] ordered by q values.
+
+    If the accept header includes vendor-specific types like::
+        application/vnd.yourcompany.yourproduct-v1.1+json
+
+    It will actually convert the vendor and version into parameters and
+    convert the content type into `application/json` so appropriate content
+    negotiation decisions can be made.
+
+    Default `q` for values that are not specified is 1.0
+
+    # Based on https://gist.github.com/samuraisam/2714195
+    # Also, based on a snipped found in this project:
+    #   https://github.com/martinblech/mimerender
+    """
+    result = []
+    for media_range in accept.split(","):
+        parts = media_range.split(";")
+        media_type = parts.pop(0).strip()
+        media_params = []
+        # convert vendor-specific content types into something useful (see
+        # docstring)
+        typ, subtyp = media_type.split('/')
+        # check for a + in the sub-type
+        if '+' in subtyp:
+            # if it exists, determine if the subtype is a vendor-specific type
+            vnd, sep, extra = subtyp.partition('+')
+            if vnd.startswith('vnd'):
+                # and then... if it ends in something like "-v1.1" parse the
+                # version out
+                if '-v' in vnd:
+                    vnd, sep, rest = vnd.rpartition('-v')
+                    if len(rest):
+                        # add the version as a media param
+                        try:
+                            version = media_params.append(('version',
+                                                           float(rest)))
+                        except ValueError:
+                            version = 1.0  # could not be parsed
+                # add the vendor code as a media param
+                media_params.append(('vendor', vnd))
+                # and re-write media_type to something like application/json so
+                # it can be used usefully when looking up emitters
+                media_type = '{}/{}'.format(typ, extra)
+        q = 1.0
+        for part in parts:
+            (key, value) = part.lstrip().split("=", 1)
+            key = key.strip()
+            value = value.strip()
+            if key == "q":
+                q = float(value)
+            else:
+                media_params.append((key, value))
+        result.append((media_type, dict(media_params), q))
+    result.sort(lambda x, y: -cmp(x[2], y[2]))
+    return result
+
+
+class Routing(object):
+    __route_map = {}
+    __var_to_regex = re.compile(r"<:(\w+)>")
+    url_to_regex = r"([\w.\-~!$&'()*+,;=:@\%%]+)"
+
+    @classmethod
+    def route(cls, rule, methods='GET'):
+        """A decorator that is used to register a view function for a
+        given URL rule:
+            @self.route('/')
+            def index():
+                return 'Hello World'
+
+        rest/ will be added to the beginning of the url string
+
+        Args:
+            rule (string): the URL rule
+            methods (string or tuple or list): the http method
+        """
+        # strip the '/' character from rule string
+        rule = rule.strip('/')
+
+        # add 'rest/' to the rule string
+        if not rule.startswith('rest/'):
+            rule = '^rest/' + rule + '$'
+
+        if isinstance(methods, basestring):  # convert string to tuple
+            methods = (methods,)
+        methods = set(item.upper() for item in methods)
+
+        # convert a rule to a compiled regex object
+        # so /data/<:class>/<:id> will become
+        #    /data/([charset]+)/([charset]+)
+        # and extract the variable names to a list [(class), (id)]
+        func_vars = cls.__var_to_regex.findall(rule)
+        rule = re.compile(cls.__var_to_regex.sub(cls.url_to_regex, rule))
+
+        # then we decorate it:
+        # route_map[regex][method] = func
+        def decorator(func):
+            rule_route = cls.__route_map.get(rule, {})
+            func_obj = {
+                'func': func,
+                'vars': func_vars
+            }
+            for method in methods:
+                rule_route[method] = func_obj
+            cls.__route_map[rule] = rule_route
+            return func
+        return decorator
+
+    @classmethod
+    def execute(cls, instance, path, method, input):
+        # format the input
+        path = path.strip('/').lower()
+        method = method.upper()
+
+        # find the rule match the path
+        # then get handler match the method
+        for path_regex in cls.__route_map:
+            match_obj = path_regex.match(path)
+            if match_obj:
+                try:
+                    func_obj = cls.__route_map[path_regex][method]
+                except KeyError:
+                    raise Reject('Method %s not allowed' % method)
+
+                # retrieve the vars list and the function caller
+                list_vars = func_obj['vars']
+                func = func_obj['func']
+
+                # zip the varlist into a dictionary, and pass it to the caller
+                args = dict(zip(list_vars, match_obj.groups()))
+                args['input'] = input
+                return func(instance, **args)
+        raise NotFound('Nothing matches the given URI')
+
+
+class RestfulInstance(object):
+    """The RestfulInstance performs REST request from the client"""
+
+    __default_patch_op = "replace"  # default operator for PATCH method
+    __accepted_content_type = {
+        "application/json": "json",
+        "*/*": "json"
+        # "application/xml": "xml"
+    }
+    __default_accept_type = "json"
+
+    def __init__(self, client, db):
+        self.client = client
+        self.db = db
+        self.translator = client.translator
+        self.actions = client.instance.actions.copy()
+        self.actions.update({'retire': actions.Retire})
+
+        protocol = 'http'
+        host = self.client.env['HTTP_HOST']
+        tracker = self.client.env['TRACKER_NAME']
+        self.base_path = '%s://%s/%s/rest' % (protocol, host, tracker)
+        self.data_path = self.base_path + '/data'
+
+    def error_obj(self, status, msg, source=None):
+        """Return an error object"""
+        self.client.response_code = status
+        result = {
+            'error': {
+                'status': status,
+                'msg': msg
+            }
+        }
+        if source is not None:
+            result['error']['source'] = source
+
+        return result
+
+    @Routing.route("/data/<:class_name>", 'GET')
+    @_data_decorator
+    def get_collection(self, class_name, input):
+        """GET resource from class URI.
+
+        This function returns only items have View permission
+        class_name should be valid already
+
+        Args:
+            class_name (string): class name of the resource (Ex: issue, msg)
+            input (list): the submitted form of the user
+
+        Returns:
+            int: http status code 200 (OK)
+            list: list of reference item in the class
+                id: id of the object
+                link: path to the object
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % self.db.classes)
+        if not self.db.security.hasPermission(
+            'View', self.db.getuid(), class_name
+        ):
+            raise Unauthorised('Permission to view %s denied' % class_name)
+
+        class_obj = self.db.getclass(class_name)
+        class_path = '%s/%s/' % (self.data_path, class_name)
+
+        # Handle filtering and pagination
+        filter_props = {}
+        page = {
+            'size': None,
+            'index': None
+        }
+        for form_field in input.value:
+            key = form_field.name
+            value = form_field.value
+            if key.startswith("where_"):  # serve the filter purpose
+                key = key[6:]
+                filter_props[key] = [
+                    getattr(self.db, key).lookup(p)
+                    for p in value.split(",")
+                ]
+            elif key.startswith("page_"):  # serve the paging purpose
+                key = key[5:]
+                value = int(value)
+                page[key] = value
+
+        if not filter_props:
+            obj_list = class_obj.list()
+        else:
+            obj_list = class_obj.filter(None, filter_props)
+
+        # extract result from data
+        result = [
+            {'id': item_id, 'link': class_path + item_id}
+            for item_id in obj_list
+            if self.db.security.hasPermission(
+                'View', self.db.getuid(), class_name, itemid=item_id
+            )
+        ]
+
+        # pagination
+        if page['size'] is not None and page['index'] is not None:
+            page_start = max(page['index'] * page['size'], 0)
+            page_end = min(page_start + page['size'], len(result))
+            result = result[page_start:page_end]
+
+        self.client.setHeader("X-Count-Total", str(len(result)))
+        return 200, result
+
+    @Routing.route("/data/<:class_name>/<:item_id>", 'GET')
+    @_data_decorator
+    def get_element(self, class_name, item_id, input):
+        """GET resource from object URI.
+
+        This function returns only properties have View permission
+        class_name and item_id should be valid already
+
+        Args:
+            class_name (string): class name of the resource (Ex: issue, msg)
+            item_id (string): id of the resource (Ex: 12, 15)
+            input (list): the submitted form of the user
+
+        Returns:
+            int: http status code 200 (OK)
+            dict: a dictionary represents the object
+                id: id of the object
+                type: class name of the object
+                link: link to the object
+                attributes: a dictionary represent the attributes of the object
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % class_name)
+        if not self.db.security.hasPermission(
+            'View', self.db.getuid(), class_name, itemid=item_id
+        ):
+            raise Unauthorised(
+                'Permission to view %s/%s denied' % (class_name, item_id)
+            )
+
+        class_obj = self.db.getclass(class_name)
+        props = None
+        for form_field in input.value:
+            key = form_field.name
+            value = form_field.value
+            if key == "fields":
+                props = value.split(",")
+
+        if props is None:
+            props = class_obj.properties.keys()
+
+        props.sort()  # sort properties
+
+        try:
+            result = [
+                (prop_name, class_obj.get(item_id, prop_name))
+                for prop_name in props
+                if self.db.security.hasPermission(
+                    'View', self.db.getuid(), class_name, prop_name, item_id
+                )
+            ]
+        except KeyError, msg:
+            raise UsageError("%s field not valid" % msg)
+        result = {
+            'id': item_id,
+            'type': class_name,
+            'link': '%s/%s/%s' % (self.data_path, class_name, item_id),
+            'attributes': dict(result)
+        }
+
+        return 200, result
+
+    @Routing.route("/data/<:class_name>/<:item_id>/<:attr_name>", 'GET')
+    @_data_decorator
+    def get_attribute(self, class_name, item_id, attr_name, input):
+        """GET resource from attribute URI.
+
+        This function returns only attribute has View permission
+        class_name should be valid already
+
+        Args:
+            class_name (string): class name of the resource (Ex: issue, msg)
+            item_id (string): id of the resource (Ex: 12, 15)
+            attr_name (string): attribute of the resource (Ex: title, nosy)
+            input (list): the submitted form of the user
+
+        Returns:
+            int: http status code 200 (OK)
+            list: a dictionary represents the attribute
+                id: id of the object
+                type: class name of the attribute
+                link: link to the attribute
+                data: data of the requested attribute
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % class_name)
+        if not self.db.security.hasPermission(
+            'View', self.db.getuid(), class_name, attr_name, item_id
+        ):
+            raise Unauthorised(
+                'Permission to view %s/%s %s denied' %
+                (class_name, item_id, attr_name)
+            )
+
+        class_obj = self.db.getclass(class_name)
+        data = class_obj.get(item_id, attr_name)
+        result = {
+            'id': item_id,
+            'type': type(data),
+            'link': "%s/%s/%s/%s" %
+                    (self.data_path, class_name, item_id, attr_name),
+            'data': data
+        }
+
+        return 200, result
+
+    @Routing.route("/data/<:class_name>", 'OPTIONS')
+    @_data_decorator
+    def options_collection(self, class_name, input):
+        """OPTION return the HTTP Header for the class uri
+
+        Returns:
+            int: http status code 204 (No content)
+            body (string): an empty string
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % class_name)
+        return 204, ""
+
+    @Routing.route("/data/<:class_name>/<:item_id>", 'OPTIONS')
+    @_data_decorator
+    def options_element(self, class_name, item_id, input):
+        """OPTION return the HTTP Header for the object uri
+
+        Returns:
+            int: http status code 204 (No content)
+            body (string): an empty string
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % class_name)
+        self.client.setHeader(
+            "Accept-Patch",
+            "application/x-www-form-urlencoded, multipart/form-data"
+        )
+        return 204, ""
+
+    @Routing.route("/data/<:class_name>/<:item_id>/<:attr_name>", 'OPTIONS')
+    @_data_decorator
+    def option_attribute(self, class_name, item_id, attr_name, input):
+        """OPTION return the HTTP Header for the attribute uri
+
+        Returns:
+            int: http status code 204 (No content)
+            body (string): an empty string
+        """
+        if class_name not in self.db.classes:
+            raise NotFound('Class %s not found' % class_name)
+        self.client.setHeader(
+            "Accept-Patch",
+            "application/x-www-form-urlencoded, multipart/form-data"
+        )
+        return 204, ""
+
+    def dispatch(self, method, uri, input):
+        """format and process the request"""
+        # if X-HTTP-Method-Override is set, follow the override method
+        headers = self.client.request.headers
+        method = headers.getheader('X-HTTP-Method-Override') or method
+
+        # parse Accept header and get the content type
+        accept_header = parse_accept_header(headers.getheader('Accept'))
+        accept_type = "invalid"
+        for part in accept_header:
+            if part[0] in self.__accepted_content_type:
+                accept_type = self.__accepted_content_type[part[0]]
+
+        # get the request format for response
+        # priority : extension from uri (/rest/issue.json),
+        #            header (Accept: application/json, application/xml)
+        #            default (application/json)
+        ext_type = os.path.splitext(urlparse.urlparse(uri).path)[1][1:]
+        data_type = ext_type or accept_type or self.__default_accept_type
+
+        # check for pretty print
+        try:
+            pretty_output = input['pretty'].value.lower() == "true"
+        except KeyError:
+            pretty_output = False
+
+        # add access-control-allow-* to support CORS
+        self.client.setHeader("Access-Control-Allow-Origin", "*")
+        self.client.setHeader(
+            "Access-Control-Allow-Headers",
+            "Content-Type, Authorization, X-HTTP-Method-Override"
+        )
+        self.client.setHeader(
+            "Allow",
+            "HEAD, OPTIONS, GET"
+        )
+        self.client.setHeader(
+            "Access-Control-Allow-Methods",
+            "HEAD, OPTIONS, GET"
+        )
+
+        # Call the appropriate method
+        try:
+            output = Routing.execute(self, uri, method, input)
+        except NotFound, msg:
+            output = self.error_obj(404, msg)
+        except Reject, msg:
+            output = self.error_obj(405, msg)
+
+        # Format the content type
+        if data_type.lower() == "json":
+            self.client.setHeader("Content-Type", "application/json")
+            if pretty_output:
+                indent = 4
+            else:
+                indent = None
+            output = RoundupJSONEncoder(indent=indent).encode(output)
+        else:
+            self.client.response_code = 406
+            output = "Content type is not accepted by client"
+
+        return output
+
+
+class RoundupJSONEncoder(json.JSONEncoder):
+    """RoundupJSONEncoder overrides the default JSONEncoder to handle all
+    types of the object without returning any error"""
+    def default(self, obj):
+        try:
+            result = json.JSONEncoder.default(self, obj)
+        except TypeError:
+            result = str(obj)
+        return result
diff --git a/roundup/scripts/roundup_server.py b/roundup/scripts/roundup_server.py
--- a/roundup/scripts/roundup_server.py
+++ b/roundup/scripts/roundup_server.py
@@ -251,7 +251,7 @@
         else:
             return self.run_cgi()
 
-    do_GET = do_POST = do_HEAD = run_cgi_outer
+    do_GET = do_POST = do_HEAD = do_PUT = do_DELETE = do_PATCH = do_OPTIONS = run_cgi_outer
 
     def index(self):
         ''' Print up an index of the available trackers
diff --git a/test/test_rest.py b/test/test_rest.py
new file mode 100644
--- /dev/null
+++ b/test/test_rest.py
@@ -0,0 +1,301 @@
+import unittest
+import os
+import shutil
+import errno
+
+from roundup.cgi.exceptions import *
+from roundup import password, hyperdb
+from roundup.rest import RestfulInstance
+from roundup.backends import list_backends
+from roundup.cgi import client
+import random
+
+import db_test_base
+
+NEEDS_INSTANCE = 1
+
+
+class TestCase(unittest.TestCase):
+
+    backend = None
+
+    def setUp(self):
+        self.dirname = '_test_rest'
+        # set up and open a tracker
+        self.instance = db_test_base.setupTracker(self.dirname, self.backend)
+
+        # open the database
+        self.db = self.instance.open('admin')
+
+        # Get user id (user4 maybe). Used later to get data from db.
+        self.joeid = self.db.user.create(
+            username='joe',
+            password=password.Password('random'),
+            address='random at home.org',
+            realname='Joe Random',
+            roles='User'
+        )
+
+        self.db.commit()
+        self.db.close()
+
+        env = {
+            'PATH_INFO': 'http://localhost/rounduptest/rest/',
+            'HTTP_HOST': 'localhost',
+            'TRACKER_NAME': 'rounduptest'
+        }
+        self.dummy_client = client.Client(self.instance, None, env, [], None)
+        self.empty_form = cgi.FieldStorage()
+
+    def tearDown(self):
+        self.db.close()
+        try:
+            shutil.rmtree(self.dirname)
+        except OSError, error:
+            if error.errno not in (errno.ENOENT, errno.ESRCH):
+                raise
+
+    def open(self, user='joe'):
+        """
+        Opens database as given user.
+        """
+        self.db = self.instance.open(user)
+
+        self.db.tx_Source = 'web'
+
+        self.db.issue.addprop(tx_Source=hyperdb.String())
+        self.db.msg.addprop(tx_Source=hyperdb.String())
+
+        self.db.post_init()
+
+        thisdir = os.path.dirname(__file__)
+        vars = {}
+        execfile(os.path.join(thisdir, "tx_Source_detector.py"), vars)
+        vars['init'](self.db)
+        self.server = RestfulInstance(self.dummy_client, self.db)
+
+    def testGetSelf(self):
+        """
+        Retrieve all three users
+        obtain data for 'joe'
+        """
+        self.open()
+        # Retrieve all three users.
+        results = self.server.get_collection('user', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(len(results['data']), 3)
+
+        # Obtain data for 'joe'.
+        results = self.server.get_element('user', self.joeid, self.empty_form)
+        results = results['data']
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(results['attributes']['username'], 'joe')
+        self.assertEqual(results['attributes']['realname'], 'Joe Random')
+
+        # Obtain data for 'joe'.
+        results = self.server.get_attribute(
+            'user', self.joeid, 'username', self.empty_form
+        )
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(results['data']['data'], 'joe')
+
+    def testGetAdmin(self):
+        """
+        Read admin data.
+        """
+        self.open()
+        results = self.server.get_element('user', '1', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertFalse(results['data']['attributes'].has_key('openids'))
+        self.assertFalse(results['data']['attributes'].has_key('password'))
+
+    def testGetSelfAttribute(self):
+        """
+        Read admin data.
+        """
+        self.open()
+        results = self.server.get_attribute('user', self.joeid, 'password', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertIsNotNone(results['data']['data'])
+
+    def testGetAdminAttribute(self):
+        """
+        Read admin data.
+        """
+        self.open()
+        # Retrieve all three users.
+        results = self.server.get_attribute('user', '1', 'password', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 403)
+
+    def testGetAnonymous(self):
+        """
+        Anonymous should not get users.
+        """
+        self.open('anonymous')
+        results = self.server.get_collection('user', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 403)
+
+    def testFilter(self):
+        """
+        Retrieve all three users
+        obtain data for 'joe'
+        """
+        self.open()
+        # create sample data
+        try:
+            self.db.status.create(name='open')
+        except ValueError:
+            pass
+        try:
+            self.db.status.create(name='closed')
+        except ValueError:
+            pass
+        try:
+            self.db.priority.create(name='normal')
+        except ValueError:
+            pass
+        try:
+            self.db.priority.create(name='critical')
+        except ValueError:
+            pass
+        self.db.issue.create(
+            title='foo4',
+            status=self.db.status.lookup('closed'),
+            priority=self.db.priority.lookup('critical')
+        )
+        self.db.issue.create(
+            title='foo1',
+            status=self.db.status.lookup('open'),
+            priority=self.db.priority.lookup('normal')
+        )
+        issue_open_norm = self.db.issue.create(
+            title='foo2',
+            status=self.db.status.lookup('open'),
+            priority=self.db.priority.lookup('normal')
+        )
+        issue_closed_norm = self.db.issue.create(
+            title='foo3',
+            status=self.db.status.lookup('closed'),
+            priority=self.db.priority.lookup('normal')
+        )
+        issue_closed_crit = self.db.issue.create(
+            title='foo4',
+            status=self.db.status.lookup('closed'),
+            priority=self.db.priority.lookup('critical')
+        )
+        issue_open_crit = self.db.issue.create(
+            title='foo5',
+            status=self.db.status.lookup('open'),
+            priority=self.db.priority.lookup('critical')
+        )
+        base_path = self.dummy_client.env['PATH_INFO'] + 'data/issue/'
+
+        # Retrieve all issue status=open
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('where_status', 'open')
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertIn(get_obj(base_path, issue_open_norm), results['data'])
+        self.assertIn(get_obj(base_path, issue_open_crit), results['data'])
+        self.assertNotIn(
+            get_obj(base_path, issue_closed_norm), results['data']
+        )
+
+        # Retrieve all issue status=closed and priority=critical
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('where_status', 'closed'),
+            cgi.MiniFieldStorage('where_priority', 'critical')
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertIn(get_obj(base_path, issue_closed_crit), results['data'])
+        self.assertNotIn(get_obj(base_path, issue_open_crit), results['data'])
+        self.assertNotIn(
+            get_obj(base_path, issue_closed_norm), results['data']
+        )
+
+        # Retrieve all issue status=closed and priority=normal,critical
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('where_status', 'closed'),
+            cgi.MiniFieldStorage('where_priority', 'normal,critical')
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertIn(get_obj(base_path, issue_closed_crit), results['data'])
+        self.assertIn(get_obj(base_path, issue_closed_norm), results['data'])
+        self.assertNotIn(get_obj(base_path, issue_open_crit), results['data'])
+        self.assertNotIn(get_obj(base_path, issue_open_norm), results['data'])
+
+    def testPagination(self):
+        """
+        Retrieve all three users
+        obtain data for 'joe'
+        """
+        self.open()
+        # create sample data
+        for i in range(0, random.randint(5, 10)):
+            self.db.issue.create(title='foo' + str(i))
+
+        # Retrieving all the issues
+        results = self.server.get_collection('issue', self.empty_form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        total_length = len(results['data'])
+
+        # Pagination will be 70% of the total result
+        page_size = total_length * 70 // 100
+        page_zero_expected = page_size
+        page_one_expected = total_length - page_zero_expected
+
+        # Retrieve page 0
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('page_size', page_size),
+            cgi.MiniFieldStorage('page_index', 0)
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(len(results['data']), page_zero_expected)
+
+        # Retrieve page 1
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('page_size', page_size),
+            cgi.MiniFieldStorage('page_index', 1)
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(len(results['data']), page_one_expected)
+
+        # Retrieve page 2
+        form = cgi.FieldStorage()
+        form.list = [
+            cgi.MiniFieldStorage('page_size', page_size),
+            cgi.MiniFieldStorage('page_index', 2)
+        ]
+        results = self.server.get_collection('issue', form)
+        self.assertEqual(self.dummy_client.response_code, 200)
+        self.assertEqual(len(results['data']), 0)
+
+
+def get_obj(path, id):
+    return {
+        'id': id,
+        'link': path + id
+    }
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    for l in list_backends():
+        dct = dict(backend=l)
+        subcls = type(TestCase)('TestCase_%s' % l, (TestCase,), dct)
+        suite.addTest(unittest.makeSuite(subcls))
+    return suite
+
+if __name__ == '__main__':
+    runner = unittest.TextTestRunner()
+    unittest.main(testRunner=runner)


More information about the Tracker-discuss mailing list