Jump to content

Analytics/Kraken/Tutorial

From mediawiki.org

Cool, Hadoop! What can I do?

Pig

[edit]

Pig is a data processing language designed to run on top of a Hadoop cluster which makes it easy to run map reduce jobs. It comes with an interactive REPL, Grunt[1], and there are growing number of mature open source libraries for common tasks [2][3].

Resources

[edit]

To get started writing pig scripts, there is a excellent (though outdated) O'Reilly Media publication Programming Pig[4] freely available online and a nice IBM DeveloperWorks introduction[5]. There is also a thorough official documentation wiki from the apache software foundation[6].

Examples

[edit]

But perhaps the best place to start is looking at some example scripts to start playing with. The analytics team has a repository hosted on Github which contains a variety of Pig scripts [7] and user defined functions (UDFs)[8] which you can check out.

As a disclaimer, none of this code is very mature and a lot can depend on your set up, so it may take a while to get things actually working. For example, in order to use the piggybank UDFs or special Wikipedia specific UDFs you'll need to reference the shared library directory 'hdfs:///libs'. Our geocoding solution is also a big hacky at the moment, requiring you to place the geocoding database (usually called 'GeoIP.dat') in your hdfs home directory.

Here is a slightly modified and lightly annotated basic script which counts web requests count.pig:


REGISTER 'hdfs:///libs/piggybank.jar' --need to load jar files in order to use them
REGISTER 'hdfs:///libs/kraken.jar'
REGISTER 'hdfs:///libs/geoip-1.2.3.jar' --this needs to be located in your home directory
-- it is also necessary AFAWK to place the geocoding database, GeoIP.dat in your home directory
-- in order for geocoding to work

DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.RegexExtract();
--define statements shorten UDF reference
DEFINE PARSE org.wikimedia.analytics.kraken.pig.ParseWikiUrl();
DEFINE GEO org.wikimedia.analytics.kraken.pig.GetCountryCode('GeoIP.dat');
--or allow constructor arguments to be passed
DEFINE TO_MONTH org.wikimedia.analytics.kraken.pig.ConvertDateFormat('yyyy-MM-dd\'T\'HH:mm:ss', 'yyyy-MM');
DEFINE TO_MONTH_MS org.wikimedia.analytics.kraken.pig.ConvertDateFormat('yyyy-MM-dd\'T\'HH:mm:ss.SSS', 'yyyy-MM');
DEFINE HAS_MS org.wikimedia.analytics.kraken.pig.RegexMatch('.*\\.[0-9]{3}');

-- LOAD just takes a directory name and allows you 
-- to specify a schema with the AS command
LOG_FIELDS     = LOAD '$input' USING PigStorage(' ') AS (
    hostname,
    udplog_sequence,
    timestamp:chararray,
    request_time,
    remote_addr:chararray,
    http_status,
    bytes_sent,
    request_method:chararray,
    uri:chararray,
    proxy_host,
    content_type:chararray,
    referer,
    x_forwarded_for,
    user_agent );

LOG_FIELDS     = FILTER LOG_FIELDS BY (request_method MATCHES '(GET|get)');
LOG_FIELDS     = FILTER LOG_FIELDS BY content_type == 'text/html' OR (content_type == '-');

PARSED    = FOREACH LOG_FIELDS GENERATE
		    FLATTEN(PARSE(uri)) AS (language_code:chararray, isMobile:chararray, domain:chararray),
		    GEO(remote_addr) AS country,
		    (HAS_MS(timestamp) ? TO_MONTH_MS(timestamp) : TO_MONTH(timestamp)) AS month;

FILTERED    = FILTER PARSED BY (domain eq 'wikipedia.org');
        
GROUPED        = GROUP FILTERED BY (month, language_code, isMobile, country);

COUNT    = FOREACH GROUPED GENERATE
            FLATTEN(group) AS (month, language_code, isMobile, country),
            COUNT_STAR(FILTERED);

-- grunt / pig won't actually do anything until they see a STORE or DUMP command
STORE COUNT into '$output';

and the call to invoke it from inside of grunt if the script is in your home directory on HDFS:

grunt> exec -param input=/traffic/zero -param output=zero_counts hdfs:///user/<USER_NAME>/count.pig 

which should create a collection of files on HDFS in a directory names 'counts' within your home directory.

Monitoring

[edit]

To monitor one of your jobs once it has started, go to http://jobs.analytics.wikimedia.org/cluster/apps/RUNNING and you can see the number of mappers and reducers finished.

(If you are using the Browser Configured Proxy, the job URL is http://analytics1010.eqiad.wmnet:8088/cluster/apps/RUNNING)

Known issues / Annoyances / Feature requests

[edit]
  • Grunt periodically loses the ability to find files on the local file system. So when running a script which lives on an01, you might get the error "ERROR 1000: Error during parsing. File not found: src/kraken/src/main/pig/topk.pig". If this happens, just execute the script directly from the an01 shell with "$ pig ...".
  • The MaxMind GeoIP library requires that the actual database file (not the jar) live in your home directory so that it can be replicated across all of the nodes.
[edit]