[Python-checkins] CVS: python/nondist/sftools bug.py,NONE,1.1 buglib.py,NONE,1.1 jitterbuglib.py,NONE,1.1 login.py,NONE,1.1 patchtool.py,NONE,1.1 pest.py,NONE,1.1 sf-schema.txt,NONE,1.1 sfdb.py,NONE,1.1 sflib.py,NONE,1.1 upload.py,NONE,1.1
Fred L. Drake
fdrake@users.sourceforge.net
Wed, 06 Jun 2001 11:29:13 -0700
Update of /cvsroot/python/python/nondist/sftools
In directory usw-pr-cvs1:/tmp/cvs-serv15643
Added Files:
bug.py buglib.py jitterbuglib.py login.py patchtool.py pest.py
sf-schema.txt sfdb.py sflib.py upload.py
Log Message:
Jeremy's original code to work with the SourceForge bug manager from
Python.
--- NEW FILE: bug.py ---
#! /usr/bin/env python
"""Update one or more bugs"""
import buglib
import login
import sfdb
import getopt
import sys
# flags: do we use each of the following?
LOCAL_DATABASE = 1
SOURCEFORGE = 1
VERBOSE = 0
LOGIN = None
PASSWORD = None
FIELDS = ("summary", "bug_group_id", "category_id", "resolution_id",
"status_id", "assigned_to", "priority", "details")
argnames = {}
for field in FIELDS:
argnames['--' + field] = field
def main(bug_ids, updates):
"""Update one or more bugs using the fields in udpates
bug_ids -- list of SF bug_ids
updates -- keys are SF field names, values are the strings to put
there
"""
bug_ids = [int(bug) for bug in bug_ids]
session = login.Login(LOGIN, PASSWORD)
db = sfdb.BugDBInterface()
update_ids = translate_updates(db, updates)
for bug in bug_ids:
b = db.get(bug)
for k, v in update_ids.items():
if k == 'assigned_to':
b.assign(v)
else:
b.set(k, v)
b.flush()
buglib.update_bug(b.get_form_data(), session)
if VERBOSE:
print bug
def translate_updates(db, updates):
trans = {}
for k, v in updates.items():
if k in ('summary', 'assigned_to', 'details'):
trans[k] = v
continue
table = k[:-3]
mapping = getattr(db, table)
trans[k] = mapping.lookup(v)
if VERBOSE:
print trans
return trans
if __name__ == "__main__":
field_args = [field + "=" for field in FIELDS]
opts, args = getopt.getopt(sys.argv[1:], 'v',
["verbose", "login=", "password="] \
+ field_args)
updates = {}
for k, v in opts:
if k in ('-v', '--verbose'):
VERBOSE = 1
elif argnames.has_key(k):
updates[argnames[k]] = v
elif k == '--login':
LOGIN = v
elif k == '--password':
PASSWORD = v
assert len(args) > 0, "Must specify at least one bug id"
if LOGIN is None:
LOGIN = raw_input("SF username: ")
if PASSWORD is None:
import getpass
PASSWORD = getpass.getpass("SF Password: ")
main(args, updates)
--- NEW FILE: buglib.py ---
"""Tools to interact with SF Bug Tracker"""
import formatter
import getpass
import htmllib
import httplib
import urllib
import urlparse
try:
import cPickle
except ImportError:
import pickle
else:
pickle = cPickle
import pg
import login
import sflib
import sfdb
BUG_SUMMARY_URL = "http://sourceforge.net/bugs/?group_id=5470&set=custom&_assigned_to=0&_status=1&_category=100&_bug_group=100&SUBMIT=Browse"
BUG_LOAD_URL = "http://sourceforge.net/bugs/index.php?group_id=5470&func=detailbug&bug_id=%s"
BUG_POST_URL = "http://sourceforge.net/bugs/index.php"
VERBOSE = 1
class BugParser(htmllib.HTMLParser):
__super_init = htmllib.HTMLParser.__init__
VERBOSE = 0
def __init__(self, formatter, verbose=None):
self.__super_init(formatter)
self.fields = {}
self.current_field = None
self.current_field_value = None
self.summary = None
self.bug_id = None
if verbose is not None:
self.VERBOSE = verbose
def start_select(self, attrs):
if self.VERBOSE:
print "SELECT", attrs
for k, v in attrs:
if k == 'name':
self.current_field = v
assert self.current_field is not None
def end_select(self):
self.current_field = None
def start_option(self, attrs):
if self.VERBOSE:
print "OPTION", attrs
assert self.current_field_value is None
selected = 0
for k, v in attrs:
if k == 'value':
value = v
if k == 'selected':
selected = 1
if selected:
self.current_field_value = value
self.save_bgn()
def end_option(self):
if self.current_field_value is not None:
label = self.save_end()
self.fields[self.current_field] = (label,
self.current_field_value)
self.current_field_value = None
def start_input(self, attrs):
summary = 0
bug_id = 0
value = None
for k, v in attrs:
if k == 'name' and v == 'summary':
summary = 1
elif k == 'value':
value = v
elif k == 'name' and v == 'bug_id':
bug_id = 1
if summary:
assert self.summary is None
assert value is not None
self.summary = value
if bug_id:
assert self.bug_id is None
assert value is not None
self.bug_id = value
def get_bug_urls():
urls = []
p = sflib.SummaryParser(BUG_SUMMARY_URL, ('detailbug',))
p.load(BUG_SUMMARY_URL)
for path, query, frag in p.hrefs:
url = urlparse.urljoin(BUG_SUMMARY_URL, path)
urls.append(url)
if VERBOSE:
print "%d bugs found" % len(urls)
return urls
def http_get(url, session):
try:
f = session.get(url)
buf = f.read()
f.close()
except httplib.HTTPException, err:
print "Error occurred loading %s" % url
print err
print
return None
return buf
def load_bug(url, session):
for i in range(3):
buf = http_get(url, session)
if buf is not None:
break
time.sleep(10)
else:
raise RuntimeError, "could not load %s" % url
p = BugParser(formatter.NullFormatter())
p.feed(buf)
return p.bug_id, p.summary, p.fields
# a list of fields in the bug submission form that are currently
# ignored and what value should be used for them on submission
_ignored_fields = [("canned_response", "100"),
("dependent_on_task[]", "100"),
("dependent_on_bug[]", "100"),
]
def update_bug(bug_info, session):
"""Update SF bug tracker with new bug_info
bug_info is a dictionary returned from the local database
session is the login session returned by login.Login
"""
form = {'func': 'postmodbug',
'group_id': '5470',
'submit': 'Submit Changes',
}
for k, v in bug_info.items():
form[k] = str(v)
# fill in blank values for everything else
for k, v in _ignored_fields:
form[k] = v
body = urllib.urlencode(form)
headers = {'Content-Type': 'application/x-www-form-urlencoded',
}
return session.get(BUG_POST_URL, "POST", headers, body)
def make_bug_urls(bugs):
return [BUG_LOAD_URL % bug for bug in bugs]
def main(bugs=None):
session = login.Login("jhylton", getpass.getpass())
db = sfdb.BugDBInterface(pg.connect())
if bugs:
urls = make_bug_urls(bugs)
else:
urls = get_bug_urls()
for url in urls:
if VERBOSE:
print url
bug_id, summary, fields = load_bug(url, session)
if VERBOSE:
print bug_id, summary
print
if db.exists(bug_id):
db.update(bug_id, summary, fields)
else:
db.insert(bug_id, summary, fields)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
main(sys.argv[1:])
else:
main()
--- NEW FILE: jitterbuglib.py ---
"""Library for reading Jitterbug records and submitting them to SF
A Jitterbug database has one or more categories of bugs, like
incoming, open, resolved, etc. Each category is represented by a
directory. Each bug is stored in one or more files in that
directory; each bug has a unique id. The files for bug NNN are
* NNN: A mail message containing the bug report
* NNN.notes: Any notes entered via the Jitterbug Web interface
* NNN.audit: A log of changes to the bug record; one per line
* NNN.followup.I: one or more followup mail messages; first one is
numbered 1
* NNN.reply.I: one or more replies entered throught the Web form;
first one is numbered 1
This program loads each bug report for a category into the SF Bug
Tracker.
"""
import cgi
import os
import re
import rfc822
import urllib
import urlparse
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import sflib
VERBOSE = 0
SF_SUBMIT_URL = "http://sourceforge.net/bugs/index.php"
class BugLabels:
PROJECT_GROUP_ID = None
ASSIGNMENT = "100"
CATEGORY = "100"
GROUP = "100"
PRIORITY = "5"
def set_label(label, val):
setattr(BugLabels, label, val)
class Message:
def __init__(self, path):
self.path = path
f = open(path)
self.msg = rfc822.Message(f)
self.body = self.msg.fp.read()
f.close()
def __getattr__(self, attr):
return getattr(self.msg, attr)
def dump(self):
"""Return a string with a minimal copy of the message"""
headers = []
for field in "From", "Subject", "Date":
val = self.msg.getheader(field)
if val:
headers.append("%s: %s" % (field, val))
return "\n".join(headers) + "\n\n" + self.body
class Notes:
def __init__(self, buf):
self.buf = buf
def dump(self):
return self.buf
class Bug:
def __init__(self, dir, bug_id):
self.id = bug_id
self.dir = dir
self.root = os.path.join(dir, bug_id)
self._load()
def _load(self):
self._load_root()
self._load_audit()
self._load_followups()
self._load_replys()
self._load_notes()
def _load_root(self):
f = open(self.root)
self.msg = rfc822.Message(f)
# The body of a Jitterbug mail message has four more rfc822
# headers. Parse these as another message object, then get
# the real body out of the second Message object.
g = StringIO(self.msg.fp.read())
self.msg_headers = rfc822.Message(g)
self.msg_body = self.msg_headers.fp.read()
g.close()
f.close()
def _load_audit(self):
audit_path = self.root + ".audit"
if not os.path.exists(audit_path):
self.audit = None
else:
f = open(audit_path)
self.audit = f.read()
f.close()
def _load_notes(self):
notes_path = self.root + ".notes"
if not os.path.exists(notes_path):
self.notes = None
else:
f = open(notes_path)
self.notes = f.read()
f.close()
def _load_numbered(self, name):
rx = re.compile("%s.%s.(\d+)" % (self.id, name))
elts = {}
for file in os.listdir(self.dir):
mo = rx.match(file)
if not mo:
continue
msg = Message(os.path.join(self.dir, file))
elts[int(mo.group(1))] = msg
if elts:
l = elts.items()
l.sort()
l = map(lambda x:x[1], l)
else:
l = []
return l
def _load_followups(self):
self.followups = self._load_numbered('followup')
def _load_replys(self):
self.replys = self._load_numbered('reply')
def dump(self, io):
template = """Jitterbug-Id: %(jid)s
Submitted-By: %(sender)s
Date: %(date)s
Version: %(version)s
OS: %(os)s
%(body)s
====================================================================
Audit trail:
%(audit)s
"""
jid = self.id
sender = self.msg.getheader('from')
date = self.msg.getheader('date')
version = self.msg_headers.getheader('version')
os = self.msg_headers.getheader('os')
body = self.msg_body
audit = self.audit
io.write(template % vars())
def submit(self, url):
buf = self.submit_initial(url)
if not (self.followups or self.replys or self.notes):
if VERBOSE:
print "Done"
return
# find the SF bug id and post comments for each reply or
# followup
p = self._load_bug_summary(url, buf)
self.submit_followups(url)
def submit_initial(self, url):
if VERBOSE:
print "Submitting bug PR#%s" % self.id
data = self.encode_initial_bug_report()
f = urllib.urlopen(url, data)
resp = f.read()
f.close()
return resp
def submit_followups(self, url):
bug_id = self.find_bug_id(url)
if bug_id is None:
print "Error entering bug PR#%s" % self.id
return
i = 0
for msg in self.replys + self.followups:
i = i + 1
data = self.encode_followup_comment(bug_id, msg)
if VERBOSE:
print "Submitting followup/reply", i
urllib.urlopen(url, data)
if self.notes:
if VERBOSE:
print "Submitting notes"
data = self.encode_followup_comment(bug_id,
Notes(self.notes))
urllib.urlopen(url, data)
def find_bug_id(self, url):
try:
return self._bsp.get_sf_bug_id(self.id)
except KeyError:
return None
def _load_bug_summary(self, url, buf):
"""Load a bug summary start with the HTML response in buf"""
self._bsp = BugSummaryParser()
self._bsp.parse(buf)
def encode_initial_bug_report(self):
# the form vars definitions are defined by the form used to
# submit bugs on SF
form_vars = {'func': 'postaddbug',
'group_id': BugLabels.PROJECT_GROUP_ID,
'category': BugLabels.CATEGORY,
'bug_group_id': BugLabels.GROUP,
'priority': BugLabels.PRIORITY,
'assigned_to': BugLabels.ASSIGNMENT,
}
form_vars['summary'] = "%s (PR#%s)" % \
(self.msg.getheader('subject'),
self.id)
collector = StringIO()
self.dump(collector)
collector.seek(0, 0)
form_vars['details'] = collector.read().strip()
return urllib.urlencode(form_vars)
def encode_followup_comment(self, bug_id, msg):
form_vars = {'func': 'postaddcomment',
'group_id': BugLabels.PROJECT_GROUP_ID,
'bug_id': bug_id,
}
form_vars['details'] = msg.dump()
return urllib.urlencode(form_vars)
class BugSummaryParser:
"""Parse the bug summary page from sourceforge
Specific intent of this class is to extract the SF bug id
associated with a newly entered jitterbug record. We identify the
Jitterbug record by the (PR#NNN) string in the details line. The
Requires that each bug is on its own line in the HTML table. If
SF changes its output, all bets are off.
"""
def __init__(self):
self.bugs = {}
self.parser = sflib.SummaryParser(SF_SUBMIT_URL,
('detailbug',))
def get_sf_bug_id(self, pr_id):
return self.bugs[pr_id]
rx_pr = re.compile('\(PR#(\d+)\)')
def parse(self, buf):
self.parser.parse(buf)
self._load_hrefs()
def _load_hrefs(self):
"""Load hrefs from the parser object inecto self.bugs"""
for href, query_dict, line in self.parser.get_hrefs():
mo = self.rx_pr.search(line)
if not mo:
continue
pr_id = mo.group(1)
bug_id = query_dict['bug_id']
self.bugs[pr_id] = bug_id
--- NEW FILE: login.py ---
import httplib
import re
import urllib
SF_HOST = "sourceforge.net"
FORM_URL = "http://sourceforge.net/account/login.php"
test_url = "http://sourceforge.net/bugs/?func=detailbug&group_id=5470&bug_id=112628"
class Session:
def __init__(self, username, password):
self.session_hash = None
self._do_login(username, password)
if self.session_hash is None:
raise ValueError, "invalid username and password"
def _do_login(self, username, password):
form = {'return_to': '',
'form_loginname': username,
'form_pw': password,
'stay_in_ssl': '1',
'login': 'Login',
}
query = urllib.urlencode(form)
headers = {'Content-Type': 'application/x-www-form-urlencoded',
}
c = httplib.HTTPConnection(SF_HOST)
c.connect()
c.request('POST', FORM_URL, query, headers)
resp = c.getresponse()
cookie = resp.msg.getheader('set-cookie')
if cookie is None:
raise ValueError, "invalid name/password: %s" % resp.read()
self.session_hash = self._get_session_hash(cookie)
_rx_sess_hash = re.compile('(session_hash=[a-z0-9]+);')
def _get_session_hash(self, cookie):
mo = self._rx_sess_hash.search(cookie)
if mo:
return mo.group(1)
raise ValueError, "could not find session_hash in %s" % repr(cookie)
def get(self, url, method="GET", headers={}, body=None):
c = httplib.HTTPConnection(SF_HOST)
c.set_debuglevel(1)
c.connect()
_headers = {'Cookie': self.session_hash}
if headers:
_headers.update(headers)
if body:
c.request(method, url, body, _headers)
else:
c.request(method, url, headers=_headers)
resp = c.getresponse()
return resp
def Login(username, password):
return Session(username, password)
--- NEW FILE: patchtool.py ---
"""Screen scraper for Patch Manager interface
The patch form URL is http://www.sourceforge.net/patch/index.php.
GET method
If I'm lucky, it can be used without authentication.
the input fields are: (* means hidden field)
*group_id=5470
*custom=set
_assigned_to=None
_status=None
This script produces the following HTML for each entry:
<TR BGCOLOR='#FFFFFF'>
<TD><A HREF='?func=detailpatch&patch_id=100518&group_id=5470'>100518</A>
</TD>
<TD>fix bltinmodule.c for 64-bit platforms</TD>
<TD>2000-Jun-07 03:21</TD>
<TD>gvanrossum</TD>
<TD>tmick</TD></TR>
If there are more than 50 patches, the following HTML is produced:
<TR><TD COLSPAN='2'> </TD><TD> </TD><TD COLSPAN='2'><A HREF='?func=browse&group_id=5470&set=custom&_assigned_to=100&_status=100&offset=50'><B>Next 50 --></B></A></TD></TR></TABLE> <!-- end content -->
Future plans:
support authentication
command-line interface for modifying patches
"""
import cgi
import re
import types
from urllib import urlencode
from urlparse import urljoin
from urllib import urlopen
import pg
from sfdb import PatchDBInterface
VERBOSE = 0
DATABASE = None
class PatchListParser:
"""Minimal re-based parsed that grabs relevant URLs from summary"""
rx_href = re.compile('HREF="([?/=&_A-Za-z0-9]+)"')
def parse_hrefs(self, buf):
hrefs = []
offset = 0
while 1:
mo = self.rx_href.search(buf, offset)
if mo is None:
break
offset = mo.end(1)
hrefs.append(mo.group(1))
return hrefs
def get_query_hrefs(self, buf):
queries = []
for href in self.parse_hrefs(buf):
if href[0] == '?':
queries.append(href)
return queries
class PatchParser:
"""Minimal re-based parser that pulls key-values from patch page"""
rx_entry = re.compile('<TD[^>]*><B>(.+):</B><BR>(.+)</TD>')
def parse(self, buf):
entries = {}
offset = 0
while 1:
mo = self.rx_entry.search(buf, offset)
if mo is None:
break
offset = mo.end(2)
k, v = mo.group(1, 2)
entries[k] = v
return entries
def urldecode(query):
d = cgi.parse_qs(query)
for k, v in d.items():
if len(v) != 1:
raise ValueError, "unexpected duplicate entry"
d[k] = v[0]
return d
class PatchManager:
url = "http://www.sourceforge.net/patch/index.php"
group_id = 5470
list_parser = PatchListParser()
patch_parser = PatchParser()
# XXX to get the right numeric values for assigned_to and status,
# would need to scrape them out of the form...
def get_patches(self, assigned_to='0', status='0'):
assert type(assigned_to) == types.StringType
assert type(status) == types.StringType
url = self._get_initial_query(assigned_to, status)
patch_list = self._load_patch_summary(url)
patches = {}
for patch_id, p in patch_list:
patches[patch_id] = self._load_patch_detail(p)
return patches
def _get_initial_query(self, assigned_to, status):
dict = {'group_id': self.group_id,
'set': 'custom',
'SUBMIT': 'Browse',
'_assigned_to': assigned_to,
'_status': status,
}
query = urlencode(dict)
return "%s?%s" % (self.url, query)
def _load_patch_summary(self, url):
todo = [(url, 0)]
patches = []
offset = 0
while todo:
url, offset = todo[0]
del todo[0]
if VERBOSE:
print "load %s" % url
buf = urlopen(url).read()
for href in self.list_parser.get_query_hrefs(buf):
d = urldecode(href[1:])
if d['func'] == 'detailpatch':
patches.append((int(d['patch_id']),
urljoin(self.url, href)))
elif d['func'] == 'browse':
new_offset = int(d['offset'])
if new_offset > offset:
todo.append((urljoin(self.url, href),
new_offset))
return patches
def _load_patch_detail(self, url):
if VERBOSE:
print "load %s" % url
buf = urlopen(url).read()
return self.patch_parser.parse(buf)
if __name__ == "__main__":
import sys
import getopt
opts, args = getopt.getopt(sys.argv[1:], 'vd:')
assert len(args) == 0
for k, v in opts:
if k == '-v':
VERBOSE = 1
elif k == '-d':
DATABASE = v
pmgr = PatchManager()
if VERBOSE:
print "Loading patches"
p = pmgr.get_patches()
if VERBOSE:
print "Retrieved %d patches" % len(p)
if VERBOSE:
print "Inserting into local database"
if DATABASE:
db = pg.connect(DATABASE)
else:
db = pg.connect()
pdbi = PatchDBInterface(db)
for p_id, attrs in p.items():
pdbi.update(p_id, attrs)
if VERBOSE:
new = len(p) - pdbi.num_deletes
print "Found %d new patches" % new
print "Updated %d existing patches" % pdbi.num_deletes
--- NEW FILE: pest.py ---
#! /usr/bin/env python
"""Pest people about patches that aren't closed"""
import os
import pg
import smtplib
from patchdb import PatchDBInterface
SF_MAIL_SERVER = "ns1.varesearch.com"
VERBOSE = 1
QUERY = "SELECT * FROM patches_t WHERE assigned_to = %d " \
"AND status = %d"
HEADER = " user open accepted\n" \
"---------------+------+-----------"
ENTRY = "%-15.15s|%5d | %6d"
FROM = "Jeremy Hylton <jhylton@users.sourceforge.net>"
MSG_TEMPLATE = """From: Jeremy Hylton <jhylton@users.sourceforge.net>
To: %(user)s
Subject: Open and Accepted patches reminder
This message is automatically generated using the SF Patch Manager
database. The database shows that you have %(count)s
patches.
Open patches need to be resolved -- either accepted, rejected, or
postponed. Accepted patches need to be applied and closed. The
sooner this can be done the better. Open patches should be resolved
by the end of August in order to meet the 2.0b1 release deadline.
The specific patches assigned to you are:
"""
MSG_ENTRY_TEMPLATE = """Patch #%(patch_id)s: %(summary)s
http://sourceforge.net/patch/?func=detailpatch&group_id=5470&patch_id=%(patch_id)s
"""
def send_pest_mail(user, open, accepted):
user = user + "@users.sourceforge.net"
n_open = len(open)
n_accepted = len(accepted)
if n_open:
if n_accepted:
count = "%d open and %d accepted" % (n_open, n_accepted)
else:
count = "%d open" % n_open
else:
if n_accepted:
count = "%d accepted" % n_accepted
else:
raise ValueError, "both open and accepted were empty"
msg = MSG_TEMPLATE % locals()
status = []
for patch_info in open + accepted:
patch_id, summary = patch_info[:2]
status.append(MSG_ENTRY_TEMPLATE % locals())
msg = msg + "\n".join(status)
s = smtplib.SMTP("smtp.concentric.net")
# s.set_debuglevel(1)
s.sendmail(FROM, (user,), msg)
# s.sendmail(FROM, ("jhylton@users.sourceforge.net",), msg)
s.close()
def main():
dbname = os.environ['USER']
db = PatchDBInterface(pg.connect(dbname))
st_open = db.status.lookup('Open')
st_accepted = db.status.lookup('Accepted')
if VERBOSE:
print HEADER
for user, user_id in db.users.get_dict().items():
if user_id <= 100: # system-defined user ids
continue
open = db.query(QUERY % (user_id, st_open)).getresult()
accepted = db.query(QUERY % (user_id, st_accepted)).getresult()
if not (open or accepted):
if VERBOSE:
print ENTRY % (user, 0, 0)
continue
if VERBOSE:
print ENTRY % (user, len(open), len(accepted))
send_pest_mail(user, open, accepted)
if __name__ == "__main__":
main()
--- NEW FILE: sf-schema.txt ---
CREATE TABLE users_t (
user_id int PRIMARY KEY,
username text NOT NULL
);
CREATE TABLE status_t (
status_id int PRIMARY KEY,
name text NOT NULL
);
CREATE TABLE patches_t (
patch_id int PRIMARY KEY,
summary text,
status int REFERENCES status_t,
category text,
date text,
submitted_by int REFERENCES users_t,
assigned_to int REFERENCES users_t,
summary_url text
);
CREATE TABLE category_t (
category_id int PRIMARY KEY,
name text NOT NULL
);
CREATE TABLE resolution_t (
resolution_id int PRIMARY KEY,
name text NOT NULL
);
CREATE TABLE bug_group_t (
bug_group_id int PRIMARY KEY,
name text NOT NULL
);
CREATE TABLE bug_t (
bug_id int PRIMARY KEY,
summary text,
bug_group_id int REFERENCES bug_group_t,
category_id int REFERENCES category_t,
resolution_id int REFERENCES resolution_t,
status_id int REFERENCES status_t,
assigned_to int REFERENCES users_t,
priority int
);
--- NEW FILE: sfdb.py ---
import pg
import re
def quote(s):
return re.sub("'", r"\'", s)
class QueryResult:
def __init__(self, result):
self.__result = result
self.__tuple = None
def getresult(self):
if self.__tuple is None:
self.__tuple = self.__result.getresult()
return self.__tuple
def __len__(self):
return len(self.getresult())
def __getattr__(self, attr):
return getattr(self.__result, attr)
class SQLMapping:
"""Decode a simple mapping from an SQL table
An interface for a simple SQL table of the following sort:
CREATE TABLE bug_group_t (
bug_group_id int PRIMARY KEY,
name text NOT NULL
);
The chief requirements are that the types of the two fields match
the example above.
Assumes that the keys and values are disjoint, so that a single
interface can resolve in either direction.
"""
def __init__(self, db, table, fields="*"):
self.dict1 = {}
self.dict2 = {}
self.__db = db
self.__table = table
r = db.query("SELECT %s FROM %s" % (fields, table)).getresult()
for key, val in r:
assert None not in (key, val)
self.dict1[key] = val
self.dict2[val] = key
def lookup(self, kv):
r = self.dict1.get(kv)
if r is None:
r = self.dict2.get(kv)
return r
def get_dict(self):
"""Return dict mapping key to value"""
return self.dict2
__insert_q = "INSERT INTO %(table)s VALUES (%(key)s, '%(value)s')"
def insert(self, key, value):
table = self.__table
value = quote(value)
query = self.__insert_q % locals()
self.__db.query(query)
self.dict1[key] = value
self.dict2[value] = key
class PatchDBInterface:
"""Interface between the PatchManager and the SQL database
Scheme for the patches table is:
CREATE TABLE patches_t (
patch_id int PRIMARY KEY,
summary text,
status int REFERENCES status_t,
category text,
date text,
submitted_by int REFERENCES users_t,
assigned_to int REFERENCES users_t,
summary_url text
);
"""
def __init__(self, db=None):
db = db or pg.connect()
self.db = db
self.users = SQLMapping(db, 'users_t')
self.status = SQLMapping(db, 'status_t')
self.num_deletes = 0
def update(self, patch_id, attrs):
# resolve REFERENCES
status = self.status.lookup(attrs['Status'])
submitted_by = self.users.lookup(attrs['Submitted By'])
if submitted_by is None:
submitted_by = 0
assigned_to = self.users.lookup(attrs['Assigned To'])
if assigned_to is None:
assigned_to = 100
# delete old version if necessary
if self.has_patch(patch_id):
q = "DELETE FROM patches_t WHERE patch_id = %(patch_id)d"
self.db.query(q % locals())
self.num_deletes = self.num_deletes + 1
d = locals()
del d['attrs'] # just to make debugging prints clearer
for k, v in attrs.items():
d[k] = pg._quote(v, 0)
q = "INSERT INTO patches_t VALUES (%(patch_id)d," \
" %(Summary)s, %(status)d, %(Category)s, %(Date)s," \
" %(submitted_by)d, %(assigned_to)d)"
self.db.query(q % d)
def has_patch(self, patch_id):
r = self.db.query("SELECT * FROM patches_t" \
" WHERE patch_id = %d" % patch_id).getresult()
if r:
return 1
else:
return 0
def query(self, query):
return self.db.query(query)
class Bug:
"""Interface to bug_t row"""
def __init__(self, bug_info, db):
self.__db = db
self.__dict = bug_info
self.__clean = 1
# Keep this one around solely for the benefit of the SF Web
# interface; the local database ignores it.
self.__details = None
def get(self, attr):
"""Get an attribute of the bug
This method understands a few different kinds of keys:
names of bug_t columns, e.g. summary, status_id, etc.
names of bug_t references, e.g. status returns the value
referred to by status_id
"""
if self.__dict.has_key(attr):
return self.__dict[attr]
ref = attr + "_id"
if self.__dict.has_key(ref):
return self.__get_ref(attr, ref)
if attr == "details":
return self.__details
raise KeyError, "no attribute: %s" % attr
def __get_mapping(self, table):
try:
mapping = getattr(self.__db, table)
except AttributeError:
raise KeyError, "no table for attribute: %s" % table
return mapping
def __get_ref(self, table, id):
mapping = self.__get_mapping(table)
return mapping.lookup(self.__dict[id])
def set(self, attr, value):
if attr == 'details':
self.__details = value
return
if self.__dict.has_key(attr):
self.__clean = 0
self.__dict[attr] = value
return
ref = attr + "_id"
if self.__dict.has_key(ref):
self.__set_ref(attr, ref, value)
return
raise KeyError, "no attribute: %s" % attr
def __set_ref(self, table, id, value):
mapping = self.__get_mapping(table)
key = mapping.lookup(value)
if key is None:
raise ValueError, "invalid attribute for table %s: %s" % \
(table, value)
# __dict holds keys to values in the mapping
self.__clean = 0
self.__dict[id] = key
def assign(self, username):
# the set interface does not work well here
self.__set_ref("users", "assigned_to", username)
_update_ids = ('bug_group_id', 'status_id', 'category_id',
'resolution_id')
def get_update_data(self):
"""Return data in the format expected by db update method"""
d = {}
for attr in Bug._update_ids:
attr_value = attr[:-3]
key = self.get(attr_value)
if key is not None:
d[attr] = key, self.get(attr)
d['priority'] = self.get('priority'), self.get('priority')
user = self.__get_ref('users', 'assigned_to')
if user is not None:
d['assigned_to'] = user, self.get('assigned_to')
for attr, (k, v) in d.items():
d[attr] = k, str(v)
return self.__dict['bug_id'], self.__dict['summary'], d
_form_keys = ('bug_id', 'summary', 'category_id', 'priority',
'bug_group_id', 'resolution_id', 'assigned_to',
'status_id')
def get_form_data(self):
d = {}
for name in Bug._form_keys:
val = self.get(name)
if val is None:
val = 100
s = str(val)
d[name] = s
if self.__details is not None:
d['details'] = self.__details
return d
def flush_sf(self):
# XXX not sure what to do here...
pass
def flush_local(self):
args = self.get_update_data()
self.__db.update(*args)
def flush(self):
if not self.__clean:
self.flush_sf()
self.flush_local()
self.__clean = 1
class BugDBInterface:
"""Interface to bug_t"""
def __init__(self, db=None):
db = db or pg.connect()
self.__db = db
self.bug_group = SQLMapping(db, "bug_group_t")
self.category = SQLMapping(db, "category_t")
self.status = SQLMapping(db, "status_t")
self.users = SQLMapping(db, "users_t")
self.resolution = SQLMapping(db, "resolution_t")
def __query(self, query):
print query
return QueryResult(self.__db.query(query))
def get(self, bug_id):
r = self.__query('select * from bug_t where bug_id = %s' % bug_id)
if len(r) == 0:
return None
assert len(r) == 1
return Bug(r.dictresult()[0], self)
def insert(self, bug_id, summary, attrs):
"""Load bug using the info from the buglig BugParser"""
self._do_bug_query(bug_id, summary, attrs, self.__insert_q)
def update(self, bug_id, summary, attrs):
"""Update existing bug using info from buglib BugParser"""
self.__query("BEGIN WORK")
sql = 'DELETE FROM bug_t WHERE bug_id = %(bug_id)s' % locals()
self.__query(sql)
self.insert(bug_id, summary, attrs)
self.__query("COMMIT WORK")
__insert_q = "INSERT INTO bug_t (bug_id, summary, " \
"%(optional_keys)s, priority) VALUES " \
"(%(bug_id)s, '%(summary)s', %(optional_values)s, " \
"%(priority)s)"
def _do_bug_query(self, bug_id, summary, attrs, query):
summary = quote(summary)
priority = int(attrs["priority"][1])
optional_keys, optional_values = self._prep_query_refs(attrs)
sql = self.__insert_q % locals()
self.__query(sql)
def _prep_query_refs(self, attrs):
self._new_query()
self._lookup_ref(self.bug_group, attrs, "bug_group_id")
self._lookup_ref(self.category, attrs, "category_id")
self._lookup_ref(self.resolution, attrs, "resolution_id")
self._lookup_ref(self.status, attrs, "status_id")
self._lookup_ref(self.users, attrs, "assigned_to")
# now figure out which of the optional fields have values (yuck)
for k, v in self.refs.items():
if v is None:
del self.refs[k]
optional_keys = ", ".join(self.refs.keys())
optional_values = ", ".join(map(str, self.refs.values()))
return optional_keys, optional_values
__exists_q = "SELECT * FROM bug_t WHERE bug_id = %(bug_id)s"
def exists(self, bug_id):
if self.__query(self.__exists_q % locals()):
return 1
else:
return 0
def _new_query(self):
self.refs = {}
def _lookup_ref(self, table, attrs, attrname):
pair = attrs.get(attrname)
if pair is None:
self.refs[attrname] = None
return None
value, key = pair
id = table.lookup(value)
if id is None:
id = int(key)
table.insert(id, value)
self.refs[attrname] = id
return id
--- NEW FILE: sflib.py ---
"""Routines for interacting with SourceForge interfaces"""
import cgi
import re
import urllib
import urlparse
def urldecode(query):
d = cgi.parse_qs(query)
for k, v in d.items():
if len(v) != 1:
raise ValueError, "unexpected duplicate entry"
d[k] = v[0]
return d
class SummaryParser:
rx_href = re.compile('HREF="(\S*\?[A-Za-z0-9=&_]+)"')
VERBOSE = 0
def __init__(self, root_url, funcs, verbose=None):
if verbose:
self.VERBOSE = verbose
self.root_url = root_url
self.offset = 0
self.hrefs = []
self.funcs = {}
self.next = None
for func in funcs:
self.funcs[func] = 1
def get_hrefs(self):
return self.hrefs
def load(self, _url, offset=None):
url = urlparse.urljoin(self.root_url, _url)
if self.VERBOSE:
print "loading", url
if offset is not None:
self.offset = offset
f = urllib.urlopen(url)
resp = f.read()
f.close()
self.parse(resp)
def parse(self, buf):
for line in buf.split("\n"):
line_offset = 0
while 1:
mo = self.rx_href.search(line, line_offset)
if mo:
self.handle_href_match(mo, line)
line_offset = mo.end(1)
else:
break
if self.VERBOSE:
print "found %d hrefs" % len(self.hrefs)
if self.next:
self.load_next()
def handle_href_match(self, mo, line):
query = mo.group(1)
d = self.parse_query(query)
self.handle_query(query, d, line)
def handle_query(self, query, dict, line):
if self.VERBOSE:
print query
if not dict.has_key('func'):
return
if dict['func'] == 'browse' and dict.has_key('offset'):
off = int(dict['offset'])
if off > self.offset:
self.next = query, dict
if self.keep_func(dict['func']):
self.hrefs.append((query, dict, line))
def keep_func(self, func):
if self.funcs.has_key(func):
return 1
def parse_query(self, href):
i = href.find("?")
return urldecode(href[i+1:])
def load_next(self):
assert self.next is not None
query, dict = self.next
self.next = None
new_offset = int(dict['offset'])
self.load(query, new_offset)
--- NEW FILE: upload.py ---
#! /usr/bin/env python
"""Enter Jitterbug reports into the SF Bug Tracker
upload.py [OPTIONS] [jitterbug_path]
This script reads Jitterbug data from its filesystem representation
and uploads each bug report to the SourceForge Bug Tracker.
Jitterbug stores each of its bug categories in a separate directory
(e.g. open, notabug, etc.). This script reads all of the bug files
from a single directory. To upload an entire Jitterbug database, the
script must be run once on each directory.
The command-line options are used to specify metadata for SF. For
each option, you must specify the SF id for the field, which you can
find by viewing the source of the standard bug submission form.
-P [project_group_id] which project to add bugs to (REQUIRED)
-p [1..10] bug priority (default 5)
-c [category_id] SF bug category
-g [group_id] SF bug group
The -v option provides verbose output.
The -h option prints this message.
"""
import sys
import os
import re
import getopt
import jitterbuglib
VERBOSE = 0
_rx_num = re.compile('\d+$')
def find_bugs(dir):
"""Return a list of the bug ids contained in dir"""
bug_ids = []
for file in os.listdir(dir):
mo = _rx_num.match(file)
if mo:
bug_ids.append(file)
bug_ids.sort()
return bug_ids
def main(dir):
bug_ids = find_bugs(dir)
for bid in bug_ids:
bug = jitterbuglib.Bug(dir, bid)
if VERBOSE:
print "loaded PR#%s" % bid
bug.submit(jitterbuglib.SF_SUBMIT_URL)
def usage(code, msg=''):
print >> sys.stderr, __doc__
if msg:
print >> sys.stderr, msg
sys.exit(code)
if __name__ == "__main__":
proj_group_set = 0
try:
opts, args = getopt.getopt(sys.argv[1:], 'P:g:vp:hc:')
except getopt.error, msg:
usage(1, msg)
if len(args) > 1:
usage(1, 'unexpected arguments: ' + ' '.join(args[1:]))
if len(args) == 0:
usage(1, 'jitterbug_path missing')
for k, v in opts:
if k == '-v':
VERBOSE = 1
elif k == '-h':
usage(0)
elif k == '-g':
jitterbuglib.set_label("GROUP", v)
elif k == '-p':
jitterbuglib.set_label("PRIORITY", v)
elif k == '-P':
jitterbuglib.set_label("PROJECT_GROUP_ID", v)
proj_group_set = 1
elif k == '-c':
jitterbuglib.set_label("CATEGORY", v)
if not proj_group_set:
usage(1, '-P option is required')
# all's well
main(args[0])