Platform Engineering Team/Event Platform Value Stream/PoC Mediawiki Stream Enrichment
This page describes an implementation of T307959.
1. Code: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment
2. Package: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/packages/234
3. Status: PoC
4. Deployment environment: YARN https://phabricator.wikimedia.org/T323914
Mediawiki Stream Enrichment
[edit]A proof of concept Flink Service that consumes page_change events and produces wikitext enriched events in page_content_change.
- Source schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
- Destination schema: https://schema.wikimedia.org/repositories//primary/jsonschema/development/mediawiki/page/change/1.1.0.yaml
Consume enriched events.
[edit]Eneriched events are produced into eqiad.rc0.mediawiki.page_content_change brokered in Kafka Jumbo,
Example:
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.rc0.mediawiki.page_content_change
Flink on YARN
[edit]The job has been tested on Flink 1.15.
tl;dr: the steps below are more or less automated by this unsupported script: https://gitlab.wikimedia.org/-/snippets/41 .
A standalone cluster can be setup locally (on a stat machine atop YARN) with
wget <nowiki>https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz</nowiki>
tar xvzf flink-1.15.0-bin-scala_2.12.tgz
cd flink-1.15.0
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
The package target can be manually copied to a stat machine with:
scp target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0
Start a Flink cluster on YARN with
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached
From yarn.wikimedia.org you can access the Flink dashboard. This will allow monitoring job execution (Task Manager panel), and eventually stopping the job.
Job lifecycle management
[edit]Launch the job
[edit]Finally launch the job with
./bin/flink run -c org.wikimedia.mediawiki.event.enrichment.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar
Restart the job
[edit]The job uses kafka offsets to determine start (resume) points. A stopped job can be restarted. Streaming will resume from the latest recorded Kafka offset.
Yarn deployment (long lived kerberos ticket)
[edit]Currently Mediawiki Stream Enrichment runs as `analytics` job in the YARN production queue. This deployment consist of a Session cluster and the job itself.
Startup scripts can be found at:
- Download and setup Flink 1.15 https://gitlab.wikimedia.org/-/snippets/45
- Start Flink Session cluster and enrichment job (latest release) https://gitlab.wikimedia.org/-/snippets/43#LC5
By default a flink dist is setup in `/tmp` on `an-launcher1002`. This is an intentional ephemeral installation. Upon server restart Flink cluster and the enrichment job will need to be re-deployed.
View the output of a Flink job at the command line
[edit]On YARN stdout is directed to the container job, and won't be visible from the cli. We can display container output by accessing its logs with
yarn logs -applicationId <applicationId> -containerId <containerId>
Where
- <applicationId> is the Flink cluster id returned by yarn-session.sh, and visible at https://yarn.wikimedia.org.
- <containerId> is the container running a specific task, that you can find in Flink's Task Manager at https://yarn.wikimedia.org/proxy/<applicationId>/#/task-manager.
For more details see the project doc. The Flink Web Interface will be available at yarn.wikimedia.org under https://yarn.wikimedia.org/proxy/<applicationId>.
Config
[edit]There's a couple of gotchas.
JVM
[edit]We need to rewrite the Host HTTP header to properly route HTTP request from the internal YARN cluster to https://api-ro.discovery.wmnet.
To do so, we need to configure the JVM http-client to allow restricted headers.
Add the following to conf/flink-conf.yaml:
env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true
Kerberos
[edit]Kerberos authentication is required to access WMF Analytics resources. The relevant config settings are found in conf/flink-conf.yaml: ===
security.kerberos.login.use-ticket-cache: true
<nowiki>#</nowiki> security.kerberos.login.keytab:
security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA
<nowiki>#</nowiki> The configuration below defines which JAAS login contexts
security.kerberos.login.contexts: Client,KafkaClient
Scala free Flink
[edit]flink-scala deps must be removed from the Flink distribution. As of 1.15 we run flink scala free. See https://flink.apache.org/2022/02/22/scala-free.htm.
rm flink-1.15/lib/flink-scala*