my newsgroup base database. (test)

Netkiller chan.neo at gmail.com
Tue Jul 4 02:46:20 EDT 2006


Netkiller 写道:

> #!/usr/bin/python
> # -*- coding: utf-8 -*-
> """
> Project: Network News Transport Protocol Server Program
> Description:
> 基于数据库的新闻组,实现BBS前端使用NNTP协议来访问贴子


import sys,re
import MySQLdb

class NewsDB:
    conn = None
    cursor = None
    def connect(self):
        try:
            conn = MySQLdb.connect (host = "localhost",
                               user = "root",
                               passwd = "chen",
                               db = "usenet")
        except MySQLdb.Error, e:
            print "Error %d: %s" % (e.args[0], e.args[1])
            sys.exit (1)
    def fetchone(self,sql):
        self.connect()
        self.cursor = conn.cursor ()
        self.cursor.execute (sql)
        row = self.cursor.fetchone ()
        #print "server version:", row[0]
        self.close()
        return row
    def fetchall(self):
        self.connect()
        self.cursor = conn.cursor ()
        self.cursor.execute (sql)
        all = self.cursor.fetchall ()
        #print "server version:", row[0]
        self.close()
        return all
    def close(self):
        self.cursor.close ()
        self.conn.close()
class abstract:
    subject = None
    mail_from = None
    rcpt_to = None
    data = None

    group = ""

    grouplist = (
        ("cn.comp.linux", 5, 2, "y"),
        ("cn.comp.freebsd", 3, 2, "y"),
        ("cn.comp.dos", 10, 4, "y"),
        ("cn.test", 5, 2, "y"),
        ("comp.lang.python", 5, 2, "y")
        )
    def welcome(self):
        return 'Welcome'
    def list (self):
        lists = []
        for name, last, first, mode in self.grouplist:
            lists.append(name+ " " + str(last) + " " + str(first) +" "+
mode)
        return lists
    def group(self,groupname):
        group_rang = ""
        for name, last, first, mode in self.grouplist:
            if name == groupname:
                group_rang = str(last - first)+" "+ str(first)+"
"+str(last)
                break
        return group_rang
    def xover(self,first,last):
        xover_tmp = []
        xover_tmp.append("""2       Mozilla programmer needed for
children's learning program       "John Fodor, PhD" <fodor at ilt-inc.com>
  Mon, 23 Jan 2006 12:01:09 -0500
<mailman.349.1138141614.8933.jobs at lists.mozilla.org>            10532
276     Xref: number1.nntp.dca.giganews.com mozilla.jobs:2""")
        xover_tmp.append("""3       mozilla expert needed   "Paul
Sponagl" <sponagl at 7val.com>       Tue, 7 Mar 2006 20:00:52 +0100
<mailman.1692.1141758057.18700.jobs at lists.mozilla.org>        2913
41      Xref: number1.nntp.dca.giganews.com mozilla.jobs:10""")
        return xover_tmp
    def head(self):
        self.xover(first,last)
    def xhdr(self,first,last):
        xover_tmp = []
        xover_tmp.append('1       HI              "NNTP.HK"
<admin at nntp.hk>               14 Jun 2006 14:43:05 +0800
<448faff9$1 at news.nntp.hk>               1151    31      Xref:
news.nntp.hk vip.cicefans:1\r\n')
        return xover_tmp
    def newgroups(self,data,time,gmt):
        lists = []
        lists.append("cn.test.os")
        lists.append("cn.test.qa")
        return lists

class Messages(abstract):
    banner = '200 \"Welcome to Netkiler News server\"\r\n'
    conn= None
    def __init__(self):

#self.db=_mysql.connect(host="localhost",user="root",passwd="chen",db="usenet")

self.conn=MySQLdb.connect(host="localhost",user="root",passwd="chen",db="usenet")
    def list (self):
        sql = """SELECT id,`group`,(select max(Number) from article) as
last, (select min(Number) from article) as first, p FROM list"""
        #print sql
        self.conn.query(sql)
        result=self.conn.use_result()
        group = []
        for id, name, last, first, mode, in result.fetch_row(10):
            group.append(name+ " " + str(last) + " " + str(first) +" "+
mode)
        return group
    def group(self,newsgroup):
        sql = "SELECT `group`,(select count(Number) from article) as
number, (select max(Number) from article) as last, (select min(Number)
from article) as first FROM list where `group` = '%s' limit 1" %
(newsgroup)
        #print sql
        self.conn.query(sql)
        result=self.conn.use_result()
        #print result.fetch_row();
        for name, number, last, first in result.fetch_row():
            return (str(number),str(first),str(last),name)
    def xover(self,first,last):
        sql = "select * from article where Number BETWEEN %s AND %s" %
(first,last)
        #print sql
        cursor =self.conn.cursor()
        cursor.execute(sql)
        xovers = []
        for Number, MessageID, Body, Date in cursor.fetchall():
            parse = self.parseArticle(Body)
            if 'Xref' in set(parse):
                xref = '\t'+parse['Xref']
            else:
                xref = ''
            if 'Lines' in set(parse):
                lines = '\t'+parse['Lines']
            else:
                lines = ''

xovers.append(str(Number)+'\t'+parse['Subject']+'\t'+parse['From']+'\t'+parse['Date']+'\t<3dc0af03 at netkiller.hikz.com>\t'+str(len(Body))+'\t'+lines+xref)
        cursor.close ()
        return xovers
    def post(self,text):
        sql = "insert into article(body, `date`) values('%s',now())" %
(text)
        #print sql
        cursor =self.conn.cursor()
        cursor.execute(sql)
        self.conn.commit()
        cursor.close ()
        #self.conn.close ()
        #print "%d rows were updated" % cursor.rowcount
        return cursor.rowcount
    def article(self,id):
        sql = "select * from article where Number='%s'" % (id)
        #print sql
        cursor =self.conn.cursor()
        cursor.execute(sql)
        #print cursor.fetchone();
        row = cursor.fetchone()
#        for Number, MessageID, Message, Date in cursor.fetchone():
#            cursor.close ()
        return row[2]
    def head(self,id):
        sql = "select * from article where Number='%s'" % (id)
        #print sql
        cursor =self.conn.cursor()
        cursor.execute(sql)
        #print cursor.fetchone();
        row = cursor.fetchone()
#        for Number, MessageID, Message, Date in cursor.fetchone():
#            cursor.close ()
        return row[2]
    def body(self,id):
        sql = "select * from article where Number='%s'" % (id)
        #print sql
        cursor =self.conn.cursor()
        cursor.execute(sql)
        #print cursor.fetchone();
        row = cursor.fetchone()
#        for Number, MessageID, Message, Date in cursor.fetchone():
#            cursor.close ()
        return row[2]
    def parseArticle(self,data):
        """
        From: "NEO" <openunix at 163.com>
        Newsgroups: cn.test
        Subject: test
        Date: Fri, 30 Jun 2006 17:14:17 +0800
        Lines: 3
        X-Priority: 3
        X-MSMail-Priority: Normal
        X-Newsreader: Microsoft Outlook Express 6.00.2900.2180
        X-MimeOLE: Produced By Microsoft MimeOLE V6.00.2900.2180
        X-RFC2646: Format=Flowed; Original
        """
        #[a-zA-z0-9]+@[a-zA-z0-9]+.[a-zA-z]+
        matchs = (
            ('From', r'From: (.+ <.+>)\r\n'),
            ('Newsgroups', r'Newsgroups: (.+)\r\n'),
            ('Subject', r'Subject: (.+)\r\n'),
            ('Date', r'Date: (.+)\r\n'),
            ('Lines', r'Lines: (.+)\r\n'),
            ('X-Priority', r'X-Priority: (.+)\r\n'),
            ('X-MSMail-Priority', r'X-MSMail-Priority: (.+)\r\n'),
            ('X-Newsreader', r'X-Newsreader: (.+)\r\n'),
            ('X-MimeOLE', r'X-MimeOLE: (.+)\r\n'),
            ('X-RFC2646', r'X-RFC2646: (.+)\r\n'),
            ('MIME-Version', r'MIME-Version: (.+)\r\n'),
            ('User-Agent', r'User-Agent: (.+)\r\n'),
            ('Content-Type', r'Content-Type: (.+)\r\n'),
            ('Content-Transfer-Encoding', r'Content-Transfer-Encoding:
(.+)\r\n'),
            ('Xref', r'Xref: (.+)\r\n'),

            ('Body', r'\r\n\r\n(.+)\r\n')
        )
        parse = {}
        for head, value in matchs:
            digs = re.compile(value,re.IGNORECASE)
            for test in digs.findall(data):
                parse[head] = test
        return parse
def main():
    test = Messages()
    #print msg.banner
    #print test.list()
    #print test.group('cn.test')
    print test.xover('25', '50')
    #nntp.post('test')
    #delete from article;
    #text = test.article('25')
    #print repr(text)
    #print '---------------------'
    #print test.parseArticle(text)
if __name__ == '__main__':
    main()




More information about the Python-list mailing list