#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This script executes an SQL query which finds links needing disambiguation.
It presents these links either human or machine readable formats. It is able
to be used across multiple languages and wikis when [[MediaWiki:
Disambiguationspage]] is setup.
These command line parameters can be used to specify which pages to work on:
¶ms;
-format:X Changes the output format, options are JSON, XML, and HTML.
HTML operates a little differently limiting to useful
information and require webpywikipedia to work.
-callback: When specified with format as JSON wraps the output into a given
function.
"""
import wikipedia, pagegenerators
import MySQLdb
import cgitb; cgitb.enable(logdir='tracebacks')
docuReplacements = {
'¶ms;': pagegenerators.parameterHelp,
}
# TODO language translation
db_cache = None
c = None
def query(dbName, query, inputs, charset=None):
# reuse conection
global db_cache, db, c
if db_cache != dbName:
if c:c.close()
db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf", charset=charset)
db_cache = dbName
c=db.cursor()
c.execute(query, inputs)
result = c.fetchone()
return result
def getSelfRedirects(dbName, namespace, title):
title = title.encode('utf-8')
# reuse conection
global db_cache, db, c
if db_cache != dbName:
if c:c.close()
db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf")
db_cache = dbName
c=db.cursor()
c.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
c.execute("""
SELECT rd.page_namespace, rd.page_title
FROM page
JOIN pagelinks ON pl_from = page_id
JOIN page AS rd ON (rd.page_namespace = pl_namespace AND rd.page_title = pl_title)
JOIN redirect ON rd_from = rd.page_id
WHERE page.page_namespace = %s AND page.page_title = %s
AND page.page_namespace = rd_namespace AND page.page_title = rd_title
ORDER BY rd.page_namespace, rd.page_title
""",
(namespace, title)
)
results = c.fetchall()
return results
def getDabLinks(dbName, namespace, title):
title = title.encode('utf-8')
# reuse conection
global db_cache, db, c
if db_cache != dbName:
if c:c.close()
db = MySQLdb.connect(db=dbName+'_p', host=dbName.replace('_', '-') + '-p.db.toolserver.org', read_default_file="/home/dispenser/.my.cnf")
db_cache = dbName
c=db.cursor()
c.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
c.execute(
"""
SELECT pl_namespace, pl_title, EXISTS (
SELECT 1
FROM pagelinks
WHERE pl_from = page_id
AND pl_namespace=%s AND pl_title=REPLACE(%s,' ','_')
), tl_title, NULL AS rd_namespace, NULL AS rd_title
FROM pagelinks
JOIN page ON (page_title = pl_title AND page_namespace = pl_namespace)
JOIN templatelinks ON tl_from = page_id
WHERE pl_from = (
SELECT page_id
FROM page
WHERE page_namespace=%s AND page_title=REPLACE(%s,' ','_')
)
AND tl_namespace = 10
AND tl_title IN (
SELECT pl_title
FROM pagelinks
JOIN page ON pl_from = page_id
WHERE pl_namespace=10 AND page_namespace=8 AND page_title="Disambiguationspage"
)
GROUP BY pl_namespace, pl_title
UNION
SELECT pl_namespace, pl_title, EXISTS (
SELECT 1
FROM pagelinks
WHERE pl_from = rd.page_id
AND pl_namespace=%s AND pl_title=REPLACE(%s,' ','_')
), tl_title, rd_namespace, rd_title
FROM pagelinks
JOIN page ON (page_title = pl_title AND page_namespace = pl_namespace)
JOIN redirect ON rd_from = page_id
JOIN page AS rd ON (rd.page_namespace = rd_namespace AND rd.page_title = rd_title)
JOIN templatelinks ON tl_from = rd.page_id
WHERE pl_from = (
SELECT page_id
FROM page
WHERE page_namespace=%s AND page_title=REPLACE(%s,' ','_')
)
AND tl_namespace = 10
AND tl_title IN (
SELECT pl_title
FROM pagelinks
JOIN page ON pl_from = page_id
WHERE pl_namespace=10 AND page_namespace=8 AND page_title="Disambiguationspage"
)
GROUP BY pl_namespace, pl_title
ORDER BY pl_namespace, pl_title
""",
(namespace, title, namespace, title, namespace, title, namespace, title,)
)
results = c.fetchall()
return results
def api_getDabLinks(dbName, ns, title):
import urllib, time
from xml.dom.minidom import parseString
if ns!=0:raise "Only works on main space pages"
site = wikipedia.getSite()
if site.sitename() == "wikipedia:en":raise "API only setup for English Wikipedia"
gplcontinue = None
apipath = "/w/api.php"
data = {
'action':'query',
'format':'xml',
#
#'redirects':'redirects',
'prop':'categories',
'cllimit':'500',
#
'generator':'links',
'gpllimit':'20',
'titles':title,
}
while True:
#wikipedia.output(site.getUrl(apipath, data=data))
dom = parseString(site.getUrl(apipath, data=data))
#wikipedia.output(dom.documentElement.toxml())
for page in dom.documentElement.getElementsByTagName('page'):
cl = page.getElementsByTagName('cl')
if cl == []:
xml = site.getUrl(apipath+"?action=query&format=xml&redirects&prop=categories&cllimit=50&titles=%s"%urllib.quote(page.getAttribute('title').encode('utf-8')))
cl = parseString(xml).getElementsByTagName('cl')
time.sleep(.1)
print '+'
if any((True for node in cl if "All disambiguation pages" in node.getAttribute('title'))):
yield (int(page.getAttribute('ns')), page.getAttribute('title'), 0, 'Template:', None, None,)# last two -> rd_ns
elif any((True for node in cl if "disambiguation" in node.getAttribute('title'))):
print page.getAttribute('title'), 'could be one'
time.sleep(.1)
try:
gplcontinue = dom.getElementsByTagName('query-continue')[0].getElementsByTagName('links')[0].getAttribute('gplcontinue').encode('utf-8')
print "
", gplcontinue
data['gplcontinue'] = gplcontinue
except IndexError:
break
else:
wikipedia.output('Done')
def main():
genFactory = pagegenerators.GeneratorFactory()
format = "html"
callback = None
# Up the limit for genFactory
genFactory.limit = 500
for arg in wikipedia.handleArgs():
if arg.startswith('-format:'):
format = arg[8:]
elif arg.startswith('-callback:'):
callback = arg[10:]
else:
genFactory.handleArg(arg)
generator = genFactory.getCombinedGenerator() or iter([])
# FIXME this has the nasty effect it will cause the genFactory to repopulate the results look into fixing genFactory or handleUrlAndHeader()
if format=="html":
if not wikipedia.handleUrlAndHeader():return
wikipedia.startContent(form=True)
try:
for page in pagegenerators.DuplicateFilterPageGenerator(generator):
site=page.site()
def htmlLink(title, redirect=False):
return ('%s'%(site.protocol(), site.hostname(), redirect and site.get_address(title.replace(' ', '_')) or site.nice_get_address(title.replace(' ', '_')), title.replace('_', ' '), title.replace('_', ' '), ))
# TODO remove this casing & redirect hack
title = page.titleWithoutNamespace(underscore=True).encode('utf-8')
exists = query(site.dbName(), "select (select page_is_redirect from page where page_namespace=%s and page_title=%s), (select page_is_redirect from page where page_namespace=%s and page_title=%s)", (page.namespace(), title, page.namespace(), title[0].upper()+title[1:]))
if 1 in exists:
wikipedia.output("%s is a redirect page"%page.aslink())
continue
elif exists == (None, 0):
page._title = page._title[0].upper()+page._title[1:]
elif exists == (None, None):
wikipedia.output("%s does not exist"%page.aslink())
continue
#try:
results = [result for result in getDabLinks(site.dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if not result[2]]
#except:
# results = [result for result in api_getDablinks(site.dbName(),, page.namespace(), page.titleWithoutNamespace(underscore=True)) if not result[2]]
if results:
print '
%s links to %d %s. %s
'%(htmlLink(page.title().encode('utf-8')), len(results), len(results)!=1 and 'different disambiguation pages' or 'disambiguation page', '[fix links]'%(page.aslink()[2:-2].encode('utf-8')) ) print 'No disambiguation links on %s.
'%(htmlLink(page.title().encode('utf-8'))) results = getSelfRedirects(site.dbName(), page.namespace(), page.titleWithoutNamespace(underscore=True)) if results: print '%s links to %d redirect%s which points back.
'%(htmlLink(page.title().encode('utf-8')), len(results), len(results)!=1 and 's' or '') print '