SQL/XML Dumps/Stubs, page logs, abstracts
How (not) to generate stubs, page logs, and abstract dumps
[edit]TL;DR: It's complicated. Too complicated.
And now, here's the long version.
Among the other items we dump are metadata for each page and revision (“stubs”), the logs of what actions have been taken on pages (moves, deletions, patrolling, page creation and so on), and abstracts, which are small snippets of each page taken from the introductory section of the content.
As we do for all dumps, we try to break these into small pieces, so that if any one piece fails, we can retry it a few times before giving up completely. Giving up would mean that we have to restart the job from the beginning, and for larger wikis, even though the output files are broken into pieces, each piece may still take days to generate.
Let’s see how we do this for page logs, the simplest of the cases. The basics of the code flow are the same for the other two jobs.
Generation of page logs
[edit]Let’s look at xmllogs.py ([1]).
#!/usr/bin/python3
'''
generate an xml dump via multiple runs of a php script instead of one
long run.
avoids memory leak issues, permits retries when a single run fails,
recovery if db servers go away in the middle of a run by retrying
the run.
'''
import os
import sys
import getopt
from dumps.wikidump import Config
from dumps.fileutils import FileUtils
from dumps.utils import MultiVersion
That import of gzippit_append should tip us off that something ugly is coming our way.
from xmlstreams import gzippit_append, do_xml_stream
We’ll look at the dologsbackup method in a little bit. Let’s look at the main part of the script first.
The xmllogs.py script takes an optional start and end log id range, so that we can generate multiple output files for the big wikis (6 output files from 6 parallel processes) and the huge wikis (27 output files for the same number of processes for enwiki and wikidatawiki). For the other wikis a single output file is generated. This is true also for the stub and abstract dumps.
Otherwise the script takes the standard args: verbose and dryrun which do what you would expect, the name of the wiki which is of course required, the path to the dumps config file, and the full path of the output file.
def usage(message=None):
"""
display a helpful usage message with
an optional introductory message first
"""
if message is not None:
sys.stderr.write(message)
sys.stderr.write("\n")
usage_message = """
Usage: xmllogs.py --wiki wikidbname --outfile path
[--start number] [--end number]
[--config path[:overrides]] [--dryrun] [--verbose]
Options:
--wiki (-w): wiki db name, e.g. enwiki
--outfile (-o): full path to xml logs dump that will be created
--start (-s): starting log id to dump (default: 1)
--end (-e): ending log id to dump, exclusive of this entry (default: dump all)
--config (-C): path to wikidump configfile (default: "wikidump.conf" in current dir)
if followed by : and a name, this section name in the config file
will be used to override config settings in default sections
--dryrun (-d): display the commands that would be run to produce the output but
don't actually run them
--verbose (-v): display the commands that would be run to produce the output but
don't actually run them
"""
sys.stderr.write(usage_message)
sys.exit(1)
OK, we get all the args, we parse the config file and add in the specific settings for our wiki, and we call the one method of interest: dologsbackup(). We'll look at that next.
def main():
'main entry point, does all the work'
wiki = None
output_file = None
start = None
end = None
configfile = "wikidump.conf"
dryrun = False
verbose = False
try:
(options, remainder) = getopt.gnu_getopt(
sys.argv[1:], "w:o:s:e:C:fhv",
["wiki=", "outfile=",
"start=", "end=", "config=",
"help", "dryrun", "verbose"])
except getopt.GetoptError as err:
usage("Unknown option specified: " + str(err))
for (opt, val) in options:
if opt in ["-w", "--wiki"]:
wiki = val
elif opt in ["-o", "--outfile"]:
output_file = val
elif opt in ["-s", "--start"]:
start = val
elif opt in ["-e", "--end"]:
end = val
elif opt in ["-C", "--config"]:
configfile = val
elif opt in ["-d", "--dryrun"]:
dryrun = True
elif opt in ["-v", "--verbose"]:
verbose = True
elif opt in ["-h", "--help"]:
usage('Help for this script\n')
else:
usage("Unknown option specified: <%s>" % opt)
if remainder:
usage("Unknown option(s) specified: <%s>" % remainder[0])
if wiki is None:
usage("mandatory argument argument missing: --wiki")
if output_file is None:
usage("mandatory argument argument missing: --output")
if start is not None:
if not start.isdigit():
usage("value for --start must be a number")
else:
start = int(start)
if end is not None:
if not end.isdigit():
usage("value for --end must be a number")
else:
end = int(end) - 1
wikiconf = Config(configfile)
wikiconf.parse_conffile_per_project(wiki)
dologsbackup(wiki, output_file, wikiconf, start, end, dryrun, verbose)
The obligatory “work around pylint whines” code ;-)
if __name__ == '__main__':
main()
Now let's have a look at dologsbackup()
def dologsbackup(wikidb, outfile,
wikiconf, start, end, dryrun, verbose):
'''
do a logs xml dump one piece at a time, writing into uncompressed
temporary files and shovelling those into gzip's stdin for the
concatenated compressed output
'''
Why use a dict (hash, associative array) for only one file type (logs)? Because when we generate stubs, that dump has three file types, and we want to reuse code wherever possible…
outfiles = {'logs': {'name': outfile}}
for filetype in outfiles:
We write into a file in the dumps temp dir and tack _tmp onto the end of the name for good measure.
outfiles[filetype]['temp'] = os.path.join(
FileUtils.wiki_tempdir(wikidb, wikiconf.temp_dir),
os.path.basename(outfiles[filetype]['name']) + "_tmp")
Dryrun? We won’t compress at all. Because we won’t be actually running anything, heh.
if dryrun:
outfiles[filetype]['compr'] = [None, outfiles[filetype]['name']]
else:
Yes we could be gzipping a file which ends in .bz2 or .txt, but that’s the caller’s problem for using a crap output file name :-P
outfiles[filetype]['compr'] = [gzippit_append, outfiles[filetype]['name']]
This gets the path to the maintenance script to run, with the path to MWScript.php ([2]) prepended if needed (this is determined by the dumps config file).
script_command = MultiVersion.mw_script_as_array(wikiconf, "dumpBackup.php")
We set up (part of) the command to be run. The --logs says to dump page logs rather than some other type of thing.
command = [wikiconf.php] + script_command
command.extend(["--wiki=%s" % wikidb,
"--logs", "--report=1000",
"--output=file:%s" % outfiles['logs']['temp']
])
We write an xml header at the beginning of the file, with no other output. The header is gzipped all by itself. We write an xml footer at the end of the file, with no other output. It too is gzipped all by itself. We don’t want headers or footers in the middle, which contains all of the data. That too is a separate gzipped stream. When all of these are concatenated together, that is a valid gzip object which gzip tools can decompress with no special parameters required.
do_xml_stream(wikidb, outfiles, command, wikiconf,
start, end, dryrun, 'log_id', 'logging',
50000, 100000, '</logitem>\n', verbose=verbose, header=True)
do_xml_stream(wikidb, outfiles, command, wikiconf,
start, end, dryrun, 'log_id', 'logging',
50000, 100000, '</logitem>\n', verbose=verbose)
do_xml_stream(wikidb, outfiles, command, wikiconf,
start, end, dryrun, 'log_id', 'logging',
50000, 100000, '</logitem>\n', verbose=verbose, footer=True)
Pretty simple, right? But now we get to look at xmlstreams.py ([3]) which is where do_xml_stream() is defined. Here we go!
Xmlstreams for stubs, page logs, abstracts
[edit]
#!/usr/bin/python3
'''
generate an xml dump via multiple runs of a php script instead of one
long run.
avoids memory leak issues, permits retries when a single run fails,
recovery if db servers go away in the middle of a run by retrying
the run.
'''
import os
import sys
import time
import traceback
from subprocess import Popen, PIPE
from dumps.utils import DbServerInfo
from dumps.wikidump import Wiki
# fix all the error returns and make subroutines out of stuff
# current code puts together a command with a bunch of crap in it
Note that the table and the row id name for the table are passed in because we might be dumping info about pages or page logs.
def do_xml_stream(wikidb, outfiles, command, wikiconf,
start, end, dryrun, id_field, table,
small_interval, max_interval, ends_with,
verbose=False, callback=None, header=False, footer=False):
'''
do an xml dump one piece at a time, writing into uncompressed
temporary files and shovelling those into gzip's stdin for the
concatenated compressed output
if header is True, write only the header
if footer is True, write only the footer
'''
If no start or end is passed in, we’ll dump the whole thing, from the first page log entry to the last one (hence the call to get_max_id to find out what the last one is). small_interval and max_interval are the number of items to dump in a batch which can be retried upon failure without having lost too much time on the failed run, so not days, let's say.
The small_interval value is used for tiny little wikis, basically a minimum size beyond which it’s silly to ask for smaller batches than that. The max_interval is the max number of rows we will dump in a batch even on a giant wiki.
if start is None:
start = 1
interval = None
if end is None:
end = get_max_id(wikiconf, wikidb, id_field, table)
# if the whole wiki is small enough, take
# arbitrary hopefully reasonable slices
if start == 1 and end < 1000000:
interval = small_interval
if interval is None:
# hope this is not too awful a guess
interval = int((int(end) - int(start)) / 50)
if interval == 0:
interval = 1
elif interval > max_interval:
interval = max_interval
interval_save = interval
Just dump the header all by itself if the header was requested.
if header:
# get just the header
piece_command = [field for field in command]
piece_command.extend(["--skip-footer", "--start=1", "--end=1"])
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
outfiles[filetype]['compr'][1])
do_xml_piece(piece_command, outfiles, wikiconf, dryrun=dryrun, verbose=verbose)
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'].stdin.close()
for filetype in outfiles:
outfiles[filetype]['process'].wait()
Just dump the footer if that’s what’s requested. These are compressed files all on their own. In practice, they are gzipped files, since that's always the compression type wanted for stubs, page logs or abstracts. And we can concat them together with the body later to make a well-formed gzip file that is easy to manipulate later, for example for recombining several of these in a row, removing headers and footers on all but the first and last file in the sequence.
elif footer:
# get just the footer
piece_command = [field for field in command]
piece_command.extend(["--skip-header", "--start=1", "--end=1"])
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
outfiles[filetype]['compr'][1])
do_xml_piece(piece_command, outfiles, wikiconf, dryrun=dryrun, verbose=verbose)
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'].stdin.close()
for filetype in outfiles:
outfiles[filetype]['process'].wait()
Here we dump all the actual data ;-)
else:
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'] = outfiles[filetype]['compr'][0](
outfiles[filetype]['compr'][1])
This callback stuff is only used for stubs and it’s complicated, so we’ll just skip right by it…
if callback is not None:
wiki = Wiki(wikiconf, wikidb)
db_info = DbServerInfo(wiki, wikidb)
upto = start
while upto <= end:
if callback is not None:
interval = callback(upto, interval_save, wiki, db_info)
Here we will call do_xml_piece() passing in the actual maintenance script command.
piece_command = [field for field in command]
piece_command.append("--skip-header")
piece_command.extend(["--start=%s" % str(upto)])
piece_command.append("--skip-footer")
if upto + interval <= end:
piece_command.extend(["--end", str(upto + interval)])
else:
piece_command.extend(["--end", str(end + 1)])
upto = upto + interval
do_xml_piece(piece_command, outfiles, wikiconf, ends_with,
dryrun=dryrun, verbose=verbose)
Close up the output file and wait for all the processes to complete. We can wait for them in any order, since we can't move on to the next thing until they are all complete. That's it!
if not dryrun:
for filetype in outfiles:
outfiles[filetype]['process'].stdin.close()
for filetype in outfiles:
outfiles[filetype]['process'].wait()
if dryrun:
return
But of course now we have to look at do_xml_piece() which does the real work.
Although the comment says we do three retries, the number of retries is actually a configuration setting. Woops!
def do_xml_piece(command, outfiles, wikiconf, ends_with=None, dryrun=False, verbose=False):
'''
do one piece of a logs dump, output going uncompressed
to a temporary file and the that file being shovelled
into the compressor's stdin
we do three retries with plenty of delay, in case
the db server has issues or some other problem
crops up
'''
if dryrun:
sys.stderr.write("would run command: %s\n" % " ".join(command))
return
if verbose:
sys.stderr.write("running command: %s\n" % " ".join(command))
retries = 0
maxretries = wikiconf.max_retries
timeout = 60
Try the command until we get success or hit maxretries.
We wait a bit in between retries, in case it's a db server issue or a network hiccup, something that might be remedied in a few minutes. All partial output files are removed on failure, just to keep cruft from accumulating on disk.
while retries < maxretries:
try:
result = run_script(command, outfiles, ends_with)
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
result = False
if result:
break
time.sleep(timeout)
timeout = timeout * 2
retries += 1
if not result:
sys.stderr.write("failed job after max retries\n")
for filetype in outfiles:
try:
# don't bother to save partial files, cleanup everything
outfiles[filetype]['compr'].stdin.close()
os.unlink(outfiles[filetype]['temp'])
except Exception:
# files might not be there, we don't care
pass
sys.exit(1)
errors = False
for filetype in outfiles:
try:
Cat (or gzcat) the temp file to the final output file. That’s right, we write out an uncompressed intermediate file to disk, read it, and feed it to the compressor.
Here stuff can go awry; if, for some reason, we can’t feed the temp file to the output compressor correctly, then all bets are off.
catfile(outfiles[filetype]['temp'], outfiles[filetype]['process'])
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
errors = True
try:
# get rid of the final output file, it's crap now
os.unlink(outfiles[filetype]['compr'][1])
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
sys.stderr.write(repr(traceback.format_exception(
exc_type, exc_value, exc_traceback)))
Clean up all the temp files, whether we won or lost…
# get rid of all temp files, regardless
for filetype in outfiles:
try:
os.unlink(outfiles[filetype]['temp'])
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
sys.stderr.write(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
And if something unrecoverable broke, then we have to give the bad news to the caller.
if errors:
# consider ourselves screwed. the parent process can start over
sys.exit(1)
The other fun things to look at here are the methods that handle the i/o for these temp files and the final output file. Not much to say here except that it’s all very gross.
I'm not even sure that using a large argument like that to read() is the best thing over NFS or not.
Note again that we make use of gzip's "concat multiple gzip files together and get a valid gzip file as a result" property.
def catfile(inputfile, process):
'''
read a file, cat it as fast as possible to the
stdin of the process passed, then go away
'''
with open(inputfile, "r") as fhandle:
while True:
content = fhandle.read(1048576)
if not content:
fhandle.close()
break
process.stdin.write(content.encode('utf-8'))
def gzippit_append(outfile):
'''
start a gzip process that reads from stdin
and appends to the specified file
'''
process = Popen("gzip >> %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
return process
def gzippit(outfile):
'''
start a gzip process that reads from stdin
and writes to the specified file
'''
process = Popen("gzip > %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
return process
def catit(outfile):
'''
start a cat process that reads from stdin
and writes to the specified file
'''
process = Popen("cat > %s" % outfile, stdin=PIPE, shell=True, bufsize=-1)
return process
We won’t bother to delve into run_script(), which is straight forward enough if you just read the code.
There is a little sanity check on the output, to be sure it ends with the right xml tag, if that's a thing for the specific kind of file being produced.
def run_script(command, outfiles, shouldendwith=None):
'''
given a command
returns True on success, None on failure
'''
failed = False
process = Popen(command)
# would be best for there to be a timeout for this eh?
process.wait()
retval = process.returncode
if not retval:
for filetype in outfiles:
outfile = outfiles[filetype]['temp']
if os.path.exists(outfile):
# file could be empty (all pages in the range deleted)
if os.path.getsize(outfile) > 0:
if shouldendwith is not None:
with open(outfile, 'rb') as outfd:
outfd.seek(len(shouldendwith) * -1, os.SEEK_END)
remainder = outfd.read().decode('utf-8')
outfd.close()
if remainder != shouldendwith:
os.unlink(outfile)
sys.stderr.write(
"bad output saved to {ofile} from '{command}'\n".format(
ofile=outfile, command=" ".join(command)))
failed = True
else:
sys.stderr.write("nonzero return {retval} from command '{command}'\n".format(
retval=retval, command=" ".join(command)))
failed = True
if failed:
return False
return True
Same for get_max_id(), which is reproduced here just for completeness.
We do try the select a few times in case of failure, just in case the db server becomes temporarily unvailable, for example. No point in having to abort an entire dump because of a momentary glitch.
def get_max_id(wikiconf, wikidb, id_field, table):
'''
retrieve the largest id for this wiki from the db for specific table
pass in name of id field, name of table
'''
wiki = Wiki(wikiconf, wikidb)
db_info = DbServerInfo(wiki, wikidb)
query = "select MAX(%s) from %s%s;" % (
id_field, db_info.get_attr('db_table_prefix'), table)
results = None
retries = 0
maxretries = wiki.config.max_retries
end = 0
results = db_info.run_sql_and_get_output(query)
if results:
lines = results.splitlines()
if lines and lines[1]:
if not lines[1].isdigit():
return 0 # probably NULL or missing table
end = int(lines[1])
return end
while results is None and retries < maxretries:
retries = retries + 1
time.sleep(5)
results = db_info.run_sql_and_get_output(query)
if not results:
continue
lines = results.splitlines()
if lines and lines[1]:
end = int(lines[1])
break
if not end:
sys.stderr.write("failed to get max page id from db, exiting\n")
sys.exit(1)
else:
return end
Yes, it is gross, I admit it
[edit]For the last bit of gross, note that xmlllogs.py is called as a script from the xmljobs.py module. So we have:
worker.py -> potentially multiple copies of xmllogs.py -> multiple runs of dumpBackup.php for each copy, where “->” is the “why yes we are forking out” operator :-P