""" Tiny Python 3.2 wiki framework, supports multiple wikis, logins, CentralAuth login, page reads and writes and getting user information Version: 1.0.0 Last modified: 2011-10-21 Author: Hoo man """ version = "1.0.0" import urllib.request import urllib.parse import xml.etree.ElementTree import re import datetime import time default_user_agent = "Hoo's ([[User:Hoo man]]) wiki tools, Version " + version + ", no custom header set" class wiki: #is the user logged in? (bool) logged_in = False #http headers send with every request, please DON'T set cookies here headers = {} #dict. for cookies, has sub dicts. for global cookies (['global']) and per wiki ones cookies = {} #list of wikis ('name' : script_path} #please add every wiki only once or at least only use one name, because using mulitple names for one wiki will cause a cookie mess wiki_list = {} #the wiki used if not wiki is specified (should only be used if only a single wiki is in the wiki list) main_wiki = '' # # Creates a wiki object with the given wikis and logs in as the given user # # wiki_list : (dict) wikis to include # login_wiki : (string) the wiki the login should be performed on # user : (string) the user you want to log in as # password : (string) the password # user_agent : (string) the user agent sent to the servers def __init__(self, wiki_list, login_wiki=False, user=False, password=False, user_agent=None): self.wiki_list = wiki_list if login_wiki: self.main_wiki = login_wiki else: self.main_wiki = list(self.wiki_list.keys())[0] if user_agent: self.headers['User-Agent'] = user_agent else: self.headers['User-Agent'] = default_user_agent if user and password: self.login(user, password, login_wiki) def __del__(self): if self.logged_in: self.logout() # # Function for http request with own cookie management # def http_request(self, wiki, uri, params, type='GET', return_result_object = False, additional_headers = None): uri = self.wiki_list[wiki] + uri params = urllib.parse.urlencode(params) #build http headers headers = dict({}, **self.headers) #global headers if additional_headers: headers.update(additional_headers) if not 'Cookie' in headers: headers['Cookie'] = '' #get cookies for the local wiki (if set) if wiki in self.cookies: for cookie_name, cookie_value in self.cookies[wiki].items(): headers['Cookie'] += cookie_name + '=' + cookie_value + ';'; #get global cookies if 'global' in self.cookies: for cookie_name, cookie_value in self.cookies['global'].items(): headers['Cookie'] += cookie_name + '=' + cookie_value + ';'; if type == 'GET': uri += '?' + params request = urllib.request.Request(uri, headers=headers); else: #POST post_data = params.encode('UTF-8') request = urllib.request.Request(uri, post_data, headers) try: result = urllib.request.urlopen(request) if not return_result_object: text = result.read() except: return False #success, add the new set cookies to self.cookies[wiki] :) if not wiki in self.cookies: self.cookies[wiki] = {} for i in result.getheaders(): if i[0] == "Set-Cookie": m = re.match('([^=]+)=([^;]+);', i[1]) if m: name = m.group(1) value = m.group(2) if name.find('centralauth_') == -1: self.cookies[wiki][name] = value else: if not 'global' in self.cookies: self.cookies['global'] = {} self.cookies['global'][name] = value #return the result text if return_result_object: return result else: return text # # Returns the text of a page # # page : (string) the name of the page # rev_id : (int) the id of the revision to return # wiki : (string) the wiki to use def get_page(self, page=None, rev_id=None, wiki=None): if not wiki: wiki = self.main_wiki uri = '/index.php' params = {'action' : 'raw'} if page: params['title'] = page.replace(' ', '_') if rev_id: params['oldid'] = rev_id text = self.http_request(wiki, uri, params) if text: return text else: return False # # Logs an user in # # user : (string) the user you want to log in as # password : (string) the password # wiki : (string) the wiki the login should be performed on def login(self, user, password, wiki=False): if not wiki: wiki = self.main_wiki uri = '/api.php' params = {'action' : 'login', 'format' : 'xml', 'lgname' : user, 'lgpassword' : password} data = self.http_request(wiki, uri, params, 'POST') try: tree = xml.etree.ElementTree.fromstring(data) result_xml = list(tree.iter('login'))[0] status = result_xml.attrib['result'] except: return False if status == 'NeedToken': token = result_xml.attrib['token'] params = {'action' : 'login', 'format' : 'xml', 'lgname' : user, 'lgpassword' : password, 'lgtoken' : token} data = self.http_request(wiki, uri, params, 'POST', additional_headers={'Cookies' : result_xml.attrib['cookieprefix'] + '_session=' + result_xml.attrib['sessionid'] + ';'}) try: tree = xml.etree.ElementTree.fromstring(data) result_xml = list(tree.iter('login'))[0] status = result_xml.attrib['result'] except: return False if status != 'Success': return False else: self.logged_in = True self.cookies[wiki][result_xml.attrib['cookieprefix'] + 'Token'] = result_xml.attrib['lgtoken'] self.cookies[wiki][result_xml.attrib['cookieprefix'] + '_session'] = result_xml.attrib['sessionid'] def logout(self): wiki = self.main_wiki uri = '/api.php' params = {'action' : 'logout'} text = self.http_request(wiki, uri, params) #delete cookies self.cookies = {} self.logged_in = False return True # # Gets the edit token and if wanted further information about the given page # # page_name : (string) the name of the page we want to edit # wiki : (string) the wiki we want to get the edit token in # bool_return_info : (bool) return just the token or more information? def get_edit_token(self, page_name, wiki=None, bool_return_info=False): if not self.logged_in: return False if not wiki: wiki = list(self.wiki_list.keys())[0] page_name = page_name.replace(' ', '_'); uri = '/api.php' params = {'action' : 'query', 'intoken' : 'edit', 'titles' : page_name, 'format': 'xml'} if bool_return_info: params['prop'] = 'info|revisions' else: params['prop'] = 'info' text = self.http_request(wiki, uri, params) try: tree = xml.etree.ElementTree.fromstring(text) result_xml = list(tree.iter('page'))[0] token = result_xml.attrib['edittoken'] except: return False if bool_return_info: data = result_xml.attrib data.update(list(tree.iter('rev'))[0].attrib) data['token'] = token; return data else: if token != '+\\': return token else: return False # # Edits the given page # # page_name : (string) the name of the page we want to edit # text : (string) text to place on the page # wiki : (string) the wiki we want to edit in # summary : (string) the edit summary to give # minor : (bool) is it a minor edit # edit_token : (string) Edit token to use # basetimestamp : (string) basetimestamp of the last page edit (edit conflict protection) def edit_page(self, page_name, text, wiki=None, summary=None, minor=False, edit_token=None, basetimestamp=None): page_name = page_name.replace(' ', '_'); if not wiki: wiki = list(self.wiki_list.keys())[0] if not edit_token: edit_token = self.get_edit_token(page_name, wiki); if not edit_token: #Still no valid edit token return False uri = '/api.php' params = {'action' : 'edit', 'title' : page_name, 'text' : text, 'format' : 'xml', 'token' : edit_token}; if summary: params['summary'] = summary if minor: params['minor'] = 1 if basetimestamp: params['basetimestamp'] = basetimestamp data = self.http_request(wiki, uri, params, 'POST', additional_headers = {'Content-Type' : "application/x-www-form-urlencoded; charset=UTF-8"}) try: tree = xml.etree.ElementTree.fromstring(data) result_xml = list(tree.iter('edit'))[0] status = result_xml.attrib except: try: tree = xml.etree.ElementTree.fromstring(data) result_xml = list(tree.iter('error'))[0] return result_xml.attrib except: return False if status['result'] == 'Success': return status else: return False # # Gets data out of the logs and returns them as dict # #Arguments are basicly the same as the API arguments (see http://www.mediawiki.org/wiki/API:Logevents) def log_data(self, letype, wiki=None, leprop = ['ids', 'title', 'type', 'user', 'timestamp', 'comment', 'details'], leuser=None, letitle=None, lestart=None, leend=None, ledir='older', limit=50, bool_return_lestart = True): if not wiki: wiki = list(self.wiki_list.keys())[0] uri = '/api.php' params = {'action' : 'query', 'list' : 'logevents', 'letype' : letype, 'ledir' : ledir, 'lelimit' : limit, 'format': 'xml'} if leuser: params['leuser'] = leuser if letitle: letitle = letitle.replace(' ', '_'); params['letitle'] = letitle if lestart: params['lestart'] = lestart if leend: params['leend'] = leend for prop in leprop: if 'leprop' in params: params['leprop'] += '|' + prop else: params['leprop'] = prop text = self.http_request(wiki, uri, params) try: tree = xml.etree.ElementTree.fromstring(text) result_xml = list(tree.iter('item')) additional_data = None additional_xml = None if 'details' in leprop and (letype == 'block' or letype == 'rights' or letype == 'move' or letype == 'patrol'): additional_xml = list(tree.iter(letype)) if 'details' in leprop and (letype == 'protect' or letype == 'newusers'): tmp = list(tree.iter('param')) i = 0 additional_data = {} for y in tmp: if y.text: additional_data[i] = y.text i = i + 1 except: return False i = 0 data = {} for tmp in result_xml: data[i] = tmp.attrib i = i + 1 if additional_xml: i = 0 for tmp in additional_xml: data[i].update(tmp.attrib) i = i + 1 if additional_data: i = 0 for x, y in additional_data.items(): #I have absolutely no clue why a simple for loop isn't enough here data[i]['param'] = y i = i + 1 if not 0 in data: return False if bool_return_lestart: try: tmp = list(tree.iter('logevents'))[1] lestart = tmp.attrib['lestart'] data['lestart'] = lestart except: data['lestart'] = False return data def convert_time(self, timestamp, mode='decode'): format = '%Y-%m-%dT%H:%M:%SZ' if mode == 'decode': if type(timestamp).__name__ == 'list': for i in timestamp: try: data.append(datetime.datetime.strptime(i, format).timetuple()) except: return False return data else: try: return datetime.datetime.strptime(timestamp, format).timetuple() except: return False else: if type(timestamp).__name__ == 'list': for i in timestamp: try: data.append(time.strftime(format, i)) except: return False return data else: try: return time.strftime(format, timestamp) except: return False # # This function returns information about the given user(s) # # users : (list) user to gather informations about # wiki : (string) wiki to look in # extended : (bool) get extended information def user_info(self, users, wiki=None, extended=False): if not wiki: wiki = self.main_wiki uri = '/api.php' user_string = '|'.join(users) params = {'action' : 'query', 'format' : 'xml', 'list' : 'users', 'ususers' : user_string} if extended: params['usprop'] = 'blockinfo|editcount|registration|emailable|gender' data = self.http_request(wiki, uri, params) try: tree = xml.etree.ElementTree.fromstring(data) result_xml = tree.iter('user') except: return false result = {} for user in result_xml: if 'missing' in user.attrib or 'invalid' in user.attrib: result[user.attrib['name']] = False continue result[user.attrib['name']] = user.attrib return result # #this class extends wiki with some WMF specific functions # class wmf(wiki): # # Creates a wiki object with the WMF-wikis matching the arguments and logs in as the given user # # user : (string) the user you want to log in as # password : (string) the password # user_agent : (string) the user agent sent to the servers # login_wiki : (string) the wiki the login should be performed on # include_specials : (bool) include special wikis (like commons, meta, ...) # include_closed : (bool) include closed wikis # include_private : (bool) include private wikis # include_fishbowl : (bool) include wikis on fishbowl # include_projects : (list) projects to include (like wikibooks), only applies to normal wikis, wikipedias are named 'wiki' # additional_wikis : (dict) additional wikis to include def __init__(self, user=False, password=False, user_agent=None, login_wiki=False, include_specials = True, include_closed = False, include_private = False, include_fishbowl = False, include_projects = None, additional_wikis = None): if not user_agent: user_agent = default_user_agent wiki_list = self.site_matrix(include_specials, include_closed, include_private, include_fishbowl, include_projects, user_agent) if additional_wikis: wiki_list.update(additional_wikis) wiki.__init__(self, wiki_list, login_wiki, user, password, user_agent) # # returns a dict with all WMF-wikis matching the given arguments # the wikinames are the dbnames from http://meta.wikimedia.org/w/api.php?action=sitematrix&format=xml # # include_specials : (bool) include special wikis (like commons, meta, ...) # include_closed : (bool) include closed wikis # include_private : (bool) include private wikis # include_fishbowl : (bool) include wikis on fishbowl # include_projects : (list) projects to include (like wikibooks), only applies to normal wikis, wikipedias are named 'wiki' # user_agent : (string) the user agent sent to the servers def site_matrix(self, include_specials = True, include_closed = False, include_private = False, include_fishbowl = False, include_projects = None, user_agent = None): wikis = {} if not user_agent: user_agent = default_user_agent try: tmp = urllib.request.Request('http://meta.wikimedia.org/w/api.php?action=sitematrix&format=xml', headers={'User-Agent' : user_agent}); result = urllib.request.urlopen(tmp) data = result.read() tree = xml.etree.ElementTree.fromstring(data) except: return false #normal wikis languages = tree.iter('language') for lang in languages: sites = list(lang.iter('site')) for site in list(sites)[1:]: if include_projects and site.attrib['code'] not in include_projects: continue if not include_closed and 'closed' in site.attrib: continue wikis[site.attrib['dbname']] = site.attrib['url'] + '/w' #special wikis if include_specials: specials = tree.iter('special') for special in specials: if not include_closed and 'closed' in special.attrib: continue if not include_private and 'private' in special.attrib: continue if not include_fishbowl and 'fishbowl' in special.attrib: continue wikis[special.attrib['dbname']] = special.attrib['url'] + '/w' return wikis