# -*- Mode: Python; tab-width: 4 -*- # # Copyright (C) 2003 Gianluigi Tiesi # Copyright (C) 2003 NetFarm S.r.l. [http://www.netfarm.it] # # REQUIRES python >= 2.1 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # ====================================================================== __version__= '0.5' __doc__="""Cyrus admin wrapper Adds cyrus-specified commands to imaplib IMAP4 Class and defines new CYRUS class for cyrus imapd commands like cyradm""" import imaplib import re Commands = { 'SETACL': ('AUTH', 'SELECTED'), 'GETACL': ('AUTH', 'SELECTED'), 'GETQUOTA': ('AUTH', 'SELECTED'), 'SETQUOTA': ('AUTH', 'SELECTED'), 'NAMESPACE': ('AUTH',) } DEFAULT_SEP='.' QUOTE = '"' imaplib.Commands.update(Commands) #re_sep = re.compile(r"\(\(\"INBOX.\" \"(.)\"\)\)", re.IGNORECASE) re_sep = re.compile(r".*user(.).*", re.IGNORECASE) def imap_ok(res): res = str(res).upper() return (res.find('OK')!=-1) class IMAP4(imaplib.IMAP4): def getsep(self): """Gets mailbox separator""" name = 'NAMESPACE' typ, dat = self._simple_command(name) if not self.untagged_responses.has_key(name): raise self.error('no NAMESPACE response from server') namespace = self.untagged_responses[name][0] res = re_sep.match(namespace) if res: return res.group(1) return DEFAULT_SEP def getacl(self, mailbox): """Gets acl on mailbox""" name = 'GETACL' rsname = 'ACL' typ, dat = self._simple_command(name, mailbox) return self._untagged_response(typ, dat, rsname) def setacl(self, mailbox, id, acl): """Sets acl for a mailbox""" return self._simple_command('SETACL', mailbox, id, acl) def getquota(self, mailbox): """Gets quota on a mailbox""" name = 'GETQUOTA' rsname = 'QUOTA' typ, dat = self._simple_command(name, mailbox) return self._untagged_response(typ, dat, rsname) def setquota(self, mailbox, type, limit): """Sets quota on a mailbox""" quota = "(%s %s)" % (type, limit) return self._simple_command('SETQUOTA', mailbox, quota) class CYRUS: def __init__(self, host = '', port = imaplib.IMAP4_PORT): self.m = IMAP4(host, port) self.auth = 0 self.admin = '' self.sep = DEFAULT_SEP ### Login and store in self.admin admin userid def login(self, username, password): try: res = self.m.login(username, password)[0] self.auth = 1 self.admin = username self.sep = self.m.getsep() return 1 except: self.auth = 0 return 0 ### Logout: Simple def logout(self): self.auth = 0 self.admin = '' try: self.m.logout() except: return 0 return 1 ### Create mailbox and set 'c' acl to admin def cm(self, group, username): if self.auth: mailbox = group + self.sep + username res = self.m.create(mailbox)[0] if imap_ok(res): res = self.m.setacl(mailbox, self.admin, 'c')[0] return imap_ok(res) return 0 ### Delete mailbox def dm(self, group, username): if self.auth: mailbox = group + self.sep + username res = self.m.delete(mailbox)[0] return imap_ok(res) return 0 ### List of mailboxes, returns dict(with groups) of list of users def lm(self): if self.auth: mb = {} userstr = "user%s%%" % self.sep res, ml = self.m.list("*", userstr) if imap_ok(res): ## No Users if len(ml) == 1 and not ml[0]: return mb for m in ml: mailbox = m.split() if len(mailbox) == 3: mailbox = mailbox.pop() if (mailbox[0] == QUOTE and mailbox[len(mailbox)-1] == QUOTE): mailbox = mailbox[1:len(mailbox)-1] group, mailbox = mailbox.split(self.sep, 1) if not mb.has_key(group): mb[group] = [] mb[group].append(mailbox) return mb return None ### List Acl on mailbox, returns dict of acl for every user def lam(self, group, username): if self.auth: mailbox = group + self.sep + username res, acl = self.m.getacl(mailbox) if imap_ok(res): acl = acl[0] acl = acl[len(mailbox)+1:] acls = {} acl_list = acl.split() for i in range(0, len(acl_list), 2): acls[acl_list[i]] = acl_list[i+1] return acls return None ### Set Acl on mailbox def sam(self, group, username, id, rights): if self.auth: mailbox = group + self.sep + username res = self.m.setacl(mailbox, id, rights) return imap_ok(res) return 0 ### Get Quota on user def lq(self, group, username): if self.auth: mailbox = group + self.sep + username res, quota = self.m.getquota(mailbox) if imap_ok(res): try: quota = quota[0] quota = quota[len(mailbox)+1:] quota = quota[1:len(quota)-1] quota, used, limit = quota.split() used = int(used) limit = int(limit) return used, limit except: pass return None, None ### Set quota for mailbox def sq(self, group, username, limit): if self.auth: mailbox = group + self.sep + username try: limit = int(limit) except: return None res = self.m.setquota(mailbox, 'STORAGE', limit)[0] return imap_ok(res) return 0