#!/usr/bin/env python # -*- coding: utf-8 -*- """ This script executes an SQL query which finds links needing disambiguation. It presents these links either human or machine readable formats. It is able to be used across multiple languages and wikis when [[MediaWiki: Disambiguationspage]] is setup. These command line parameters can be used to specify which pages to work on: ¶ms; -format:X Changes the output format, options are JSON, XML, and HTML. HTML operates a little differently limiting to useful information and require webpywikipedia to work. -callback: When specified with format as JSON wraps the output into a given function. """ import wikipedia, pagegenerators import MySQLdb import cgitb; cgitb.enable(logdir='tracebacks') docuReplacements = { '¶ms;': pagegenerators.parameterHelp, } # TODO language translation db_cache = None c = None def query(dbName, query, inputs, charset=None): # reuse conection global db_cache, db, c if db_cache != dbName: if c:c.close() db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf", charset=charset) db_cache = dbName c=db.cursor() c.execute(query, inputs) result = c.fetchone() return result def getSelfRedirects(dbName, namespace, title): title = title.encode('utf-8') # reuse conection global db_cache, db, c if db_cache != dbName: if c:c.close() db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf") db_cache = dbName c=db.cursor() c.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") c.execute(""" SELECT rd.page_namespace, rd.page_title FROM page JOIN pagelinks ON pl_from = page_id JOIN page AS rd ON (rd.page_namespace = pl_namespace AND rd.page_title = pl_title) JOIN redirect ON rd_from = rd.page_id WHERE page.page_namespace = %s AND page.page_title = %s AND page.page_namespace = rd_namespace AND page.page_title = rd_title ORDER BY rd.page_namespace, rd.page_title """, (namespace, title) ) results = c.fetchall() return results def getDabLinks(dbName, namespace, title): title = title.encode('utf-8') # reuse conection global db_cache, db, c if db_cache != dbName: if c:c.close() db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf") db_cache = dbName c=db.cursor() c.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") c.execute( """ SELECT pl_namespace, pl_title, EXISTS ( SELECT 1 FROM pagelinks WHERE pl_from = page_id AND pl_namespace=%s AND pl_title=REPLACE(%s,' ','_') ), tl_title, NULL AS rd_namespace, NULL AS rd_title FROM pagelinks JOIN page ON (page_title = pl_title AND page_namespace = pl_namespace) JOIN templatelinks ON tl_from = page_id WHERE pl_from = ( SELECT page_id FROM page WHERE page_namespace=%s AND page_title=REPLACE(%s,' ','_') ) AND tl_namespace = 10 AND tl_title IN ( SELECT pl_title FROM pagelinks JOIN page ON pl_from = page_id WHERE pl_namespace=10 AND page_namespace=8 AND page_title="Disambiguationspage" ) GROUP BY pl_namespace, pl_title UNION SELECT pl_namespace, pl_title, EXISTS ( SELECT 1 FROM pagelinks WHERE pl_from = rd.page_id AND pl_namespace=%s AND pl_title=REPLACE(%s,' ','_') ), tl_title, rd_namespace, rd_title FROM pagelinks JOIN page ON (page_title = pl_title AND page_namespace = pl_namespace) JOIN redirect ON rd_from = page_id JOIN page AS rd ON (rd.page_namespace = rd_namespace AND rd.page_title = rd_title) JOIN templatelinks ON tl_from = rd.page_id WHERE pl_from = ( SELECT page_id FROM page WHERE page_namespace=%s AND page_title=REPLACE(%s,' ','_') ) AND tl_namespace = 10 AND tl_title IN ( SELECT pl_title FROM pagelinks JOIN page ON pl_from = page_id WHERE pl_namespace=10 AND page_namespace=8 AND page_title="Disambiguationspage" ) GROUP BY pl_namespace, pl_title ORDER BY pl_namespace, pl_title """, (namespace, title, namespace, title, namespace, title, namespace, title,) ) results = c.fetchall() return results def api_getDabLinks(dbName, ns, title): import urllib, time from xml.dom.minidom import parseString if ns!=0:raise "Only works on main space pages" site = wikipedia.getSite() if site.sitename() == "wikipedia:en":raise "API only setup for English Wikipedia" gplcontinue = None apipath = "/w/api.php" data = { 'action':'query', 'format':'xml', # #'redirects':'redirects', 'prop':'categories', 'cllimit':'500', # 'generator':'links', 'gpllimit':'20', 'titles':title, } while True: #wikipedia.output(site.getUrl(apipath, data=data)) dom = parseString(site.getUrl(apipath, data=data)) #wikipedia.output(dom.documentElement.toxml()) for page in dom.documentElement.getElementsByTagName('page'): cl = page.getElementsByTagName('cl') if cl == []: xml = site.getUrl(apipath+"?action=query&format=xml&redirects&prop=categories&cllimit=50&titles=%s"%urllib.quote(page.getAttribute('title').encode('utf-8'))) cl = parseString(xml).getElementsByTagName('cl') time.sleep(.1) print '+' if any((True for node in cl if "All disambiguation pages" in node.getAttribute('title'))): yield (int(page.getAttribute('ns')), page.getAttribute('title'), 0, 'Template:', None, None,)# last two -> rd_ns elif any((True for node in cl if "disambiguation" in node.getAttribute('title'))): print page.getAttribute('title'), 'could be one' time.sleep(.1) try: gplcontinue = dom.getElementsByTagName('query-continue')[0].getElementsByTagName('links')[0].getAttribute('gplcontinue').encode('utf-8') print "
", gplcontinue data['gplcontinue'] = gplcontinue except IndexError: break else: wikipedia.output('Done') def main(): genFactory = pagegenerators.GeneratorFactory() format = "html" callback = None # Up the limit for genFactory genFactory.limit = 500 for arg in wikipedia.handleArgs(): if arg.startswith('-format:'): format = arg[8:] elif arg.startswith('-callback:'): callback = arg[10:] else: genFactory.handleArg(arg) generator = genFactory.getCombinedGenerator() or iter([]) # FIXME this has the nasty effect it will cause the genFactory to repopulate the results look into fixing genFactory or handleUrlAndHeader() if format=="html": if not wikipedia.handleUrlAndHeader():return wikipedia.startContent(form=True) try: for page in pagegenerators.DuplicateFilterPageGenerator(generator): site=page.site() def htmlLink(title, redirect=False): return ('%s'%(site.protocol(), site.hostname(), redirect and site.get_address(title.replace(' ', '_')) or site.nice_get_address(title.replace(' ', '_')), title.replace('_', ' '), title.replace('_', ' '), )) # TODO remove this casing & redirect hack title = page.titleWithoutNamespace(underscore=True).encode('utf-8') exists = query(site.dbName(), "select (select page_is_redirect from page where page_namespace=%s and page_title=%s), (select page_is_redirect from page where page_namespace=%s and page_title=%s)", (page.namespace(), title, page.namespace(), title[0].upper()+title[1:])) if 1 in exists: wikipedia.output("%s is a redirect page"%page.aslink()) continue elif exists == (None, 0): page._title = page._title[0].upper()+page._title[1:] elif exists == (None, None): wikipedia.output("%s does not exist"%page.aslink()) continue #try: results = [result for result in getDabLinks(site.dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if not result[2]] #except: # results = [result for result in api_getDablinks(site.dbName(),, page.namespace(), page.titleWithoutNamespace(underscore=True)) if not result[2]] if results: print '

%s links to %d %s. %s

'%(htmlLink(page.title().encode('utf-8')), len(results), len(results)!=1 and 'different disambiguation pages' or 'disambiguation page', '[fix links]'%(page.aslink()[2:-2].encode('utf-8')) ) print '' else: print '

No disambiguation links on %s.

'%(htmlLink(page.title().encode('utf-8'))) results = getSelfRedirects(site.dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if results: print '

%s links to %d redirect%s which points back.

'%(htmlLink(page.title().encode('utf-8')), len(results), len(results)!=1 and 's' or '') print '' finally: wikipedia.endContent() elif format=="python": out = { "query": { "pages": { } } } for page in generator: # should be pageid not pagetitle out["query"]["pages"][page.title()] = { "pageid": None, "ns": page.namespace(), "title": page.title().encode('utf-8'), "disambiguationlinks":[], } results = getDabLinks(page.site().dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) for result in results: item = { "ns":result[0], "title": result[1].replace('_', ' '), "linksback": bool(result[2]), "template": result[3], } if result[5]: item["targetns"]=result[4] item["target"]=result[5] out["query"]["pages"][page.title()]["disambiguationlinks"].append(item) #out["query"]["pages"][page.title()]["pageid"]=None print 'Content-type: application/python' print print "%r"%out elif format=="php": pass # FIXME json hack until wrapper works elif format=="json": print "Content-type: application/json; charset=utf-8" print "Content-Disposition: attachment; filename=dablinks.json;" print output = '{"query":{"pages":{' pagenum = 0 for page in generator: results = getDabLinks(page.site().dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if results: if pagenum != 0: output+=',' output += '"%s":{"pageid":%s, "ns":%d, "title":"%s", "disambiguationlinks":['%(pagenum, 'null', page.namespace(), page.title().encode('utf-8')) resultnum = 0 for result in results: if resultnum!=0: output+=',' if not result[5]: output += '{"ns":%d, "title":"%s", "linksback": %s, "template":"%s"}'%result[0:4] else: output += '{"ns":%d, "title":"%s", "linksback": %s, "template":"%s", "targetns":%d, "target":"%s"}'%result resultnum += 1 output += ']}' pagenum += 1 output += '}}}' # FIXME move this conversion step into the title conconalicazation output = output.replace('_', ' ') if callback: print "%s(%s)"%(callback, output) else: print output # xml interface until wrapper works elif format=="xml": print "Content-type: application/xml; charset=utf-8" print print '' print '' print '' print '' for page in generator: results = getDabLinks(page.site().dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if results: print ''%(page.namespace(), page.title().encode('utf-8')) print '' for result in results: if not result[5]: print '
'%(result[:-2]) else: print '
'%result print '' print '' print '' print '' print '' if __name__ == "__main__": try: main() finally: if c:c.close() wikipedia.stopme()