В этой статье мы рассмотрим, как собрать и использовать Spark Job Server в виде приложения для Docker. Spark Job Server — полезное дополнение Spark, которое позволяет загружать и выполнять задания Spark через Rest API.
Spark Job Server входит в состав популярных дистрибутивов для больших данных, таких как Cloudera CDH5 или Hortonworks, однако, установка полноценного кластера избыточна для целей обучения, разработки или малых вычислительных задач.
В данном руководстве вы узнаете как развернуть готовую среду для разработки, тестирования и продуктового использования Apache Spark без внешних зависимостей с использованием Spark Job Server.
Сборка Spark Job Server с помощью SBT и Docker
Мы будем собирать контейнер Spark Job Server (SJS), который является самодостаточным и может использоваться для работы со встроенным сервером Spark без внешних зависимостей, кроме того, вы можете изменить файл конфигурации и подключить Spark Job Server к существующему кластеру Spark.
Установка Docker
Для успешного выполнения дальнейших действий вам необходимо установить Docker для вашей операционной системы. Если у вас уже настроен и установлен Docker, просто пропустите этот шаг. Если Docker не установлен, установите его по одному из руководств: Ubuntu 16.04 и 18.04, Debian 9 и 10, Centos 7 и приступайте к следующему шагу.
Установка Scala Build Tool
Для сборки потребуется утилита SBT, с помощью которой мы будем собирать контейнер Docker.
Установка SBT для Ubuntu / Debian может быть выполнена следующими командами:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 sudo apt-get update sudo apt-get install sbt
Установка для CentOS выполняется, как описано ниже:
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/ sudo yum install sbt
Теперь скачаем исходные коды SJS с GitHub:
git clone https://github.com/spark-jobserver/spark-jobserver.git cd spark-jobserver/
Сборка базового образа Docker для SJS выполняется с помощью команды sbt docker. Если доступ к Docker доступен только через sudo, запускайте следующую команду, используя sudo:
sbt docker job-server-tests/package
Сборка займет довольно продолжительное время, итогом чего станет генерация контейнера:
$ docker images | grep spark-jobserver velvia/spark-jobserver 0.9.1-SNAPSHOT.mesos-1.0.0.spark-2.3.2.scala-2.11.jdk-8-jdk 3ee46d478a7a 5 days ago 1.41GB
Обратите внимание на версию Spark в имени контейнера, в нашем случае — 2.3.2. Это нам понадобится при доустановке в контейнер необходимых зависимостей для PySpark.
Если вы планируете использовать контейнер для выполнения заданий Spark, реализованных на Scala или Java, можно перейти сразу к разделу «Запуск Spark Job Server». Далее мы рассмотрим как на основании полученного образа собрать подходящий для использования совместно с PySpark.
Сборка контейнера для PySpark
Мы будем использовать полученный образ как основу для генерации целевого образа, который будет поддерживать PySpark.
Создадим каталог для сборки нового контейнера, назовем его spark-jobserver-pyspark:
cd .. && mkdir spark-jobserver-pyspark
Поместим в этот каталог новый Dockerfile, который будет использоваться для построения целевого контейнера:
cat > Dockerfile from velvia/spark-jobserver:0.9.1-SNAPSHOT.mesos-1.0.0.spark-2.3.2.scala-2.11.jdk-8-jdk RUN apt-get install -y python-pip && pip --no-cache-dir install pyhocon pyspark==2.3.2 py4j Ctrl+D
Обратите внимание на то какая версия указана для PySpark, вы должны указать версию, которая соответствует версии Spark в контейнере SJS (первая строка листинга).
Если вам необходимо добавить в контейнер дополнительные файлы, библиотеки и т.п., вы можете дописать необходимые инструкции в Dockerfile.
Теперь вы можете собрать финальный образ:
docker build -t pyspark-job-server:2.3.2 .
Итогом сборки должен стать контейнер, который далее будет использоваться для работы.
Тестирование работоспособности контейнера
Для тестирования контейнера мы запустим его и выполним тестовое задание Spark:
docker run --name sjs -d --rm -p 8090:8090 spark-job-server
Теперь, если вы перейдете в браузере по адресу http://localhost:8090/, то сможете видеть панель мониторинга SJS:
Панель довольно простая и предназначена для просмотра задач, контекстов, загруженных файлов. Так же панель позволяет принудительно завершить задачу или контекст (kill). Все взаимодействие с SJS осуществляется посредством REST API.
Два важных момента:
- Если сервер, на котором вы используете SJS доступен извне, необходимо при запуске использовать -p 127.0.0.1:8090:8090 вместо -p 8090:8090, а доступ к сервису предоставлять через Nginx с аутентификацией, например, через базовую аутентификацию HTTP. Так же лучше использовать защищенный протокол HTTPS. В наших руководствах вы сможете найти как это сделать для Ubuntu и Debian, CentOS с помощью бесплатного сертификата Let’s Encrypt.
- SJS по умолчанию хранит свои данные во встраиваемой СУБД H2, которая не показывает стабильную работу и мы не рекомендуем ее использование за пределами тестирования, далее мы расскажем как заменить H2 на MariaDB.
Подключение СУБД MariaDB
Теперь необходимо установить и настроить MariaDB в виде приложения Docker по инструкции в нашей статье. Если лень читать все подряд, можно выполнить просто следующую команду:
docker run -d --restart=always --name mariadb_1 \ -v /opt/mariadb/data:/var/lib/mysql \ -v /opt/mariadb/etc:/etc/mysql/conf.d \ -v /opt/mariadb/logs:/var/lib/mysql/logs \ -e MYSQL_ROOT_PASSWORD=secret \ -p 127.0.0.1:3306:3306 mariadb:10.3
Дождитесь окончания инициализации MariaDB (5-10 секунд). Теперь создадим базу данных для SJS:
docker run -it --rm --link mariadb_1:mysql mariadb:10.3 mysql -hmysql -uroot -psecret mysql> CREATE USER 'jobserver'@'%' IDENTIFIED BY 'secret'; mysql> CREATE DATABASE spark_jobserver; mysql> GRANT ALL ON spark_jobserver.* TO 'jobserver'@'%'; mysql> FLUSH PRIVILEGES; CTRL^D
Проверьте соединение с созданной БД из под нового пользователя:
docker run -it --rm --link mariadb_1:mysql mariadb:10.3 mysql -hmysql -ujobserver -psecret spark_jobserver -e 'SELECT VERSION();'
Теперь скопируем файл конфигурации для SJS из контейнера в файловую систему сервера:
docker cp sjs:app/docker.conf .
Остановите сервер SJS, пока что он не требуется, в следующий раз мы запустим его уже с бэкендом MySQL:
docker stop sjs
Модифицируем конфигурационный файл docker.conf для работы c MySQL. В новых версиях SJS в конфигурационном файле могут быть новые параметры, поэтому убедитесь, что они будут перенесены в целевой конфигурационный файл. На момент публикации конфигурационный файл содержит следующие параметры:
spark { master = "local[*]" master = ${?SPARK_MASTER} submit.deployMode = "client" job-number-cpus = 4 jobserver { port = 8090 jobdao = spark.jobserver.io.JobSqlDAO context-creation-timeout = 60000 context-per-jvm = true # ---------------------- # настройки MYSQL sqldao { slick-driver = slick.driver.MySQLDriver jdbc-driver = com.mysql.jdbc.Driver jdbc { url = "jdbc:mysql://mysql/spark_jobserver" user = "jobserver" password = "secret" } dbcp { maxactive = 20 maxidle = 10 initialsize = 10 } } } context-settings { num-cpu-cores = 4 memory-per-node = 8192m passthrough { } } home = "/spark" } deploy { manager-start-cmd = "app/manager_start.sh" } # --------------------- # создание таблиц MySQL flyway.locations="db/mysql/migration"
Обратите внимание на параметры spark.job-number-cpus, spark.context-settings — они должны быть настроены для соответствия вашему узлу, на котором вы запускаете SJS.
Так же, обратите внимание, что вместо local[*] для параметра spark.master можно указать адрес действующего мастера Spark, в том числе и через переменную docker -e SPARK_MASTER=URI. По умолчанию, будет использоваться встраиваемый сервер Spark, что вполне подходит при выполнении вычислений на одном узле.
Теперь можно запустить SJS с бэкендом MySQL:
docker run --restart=always --link mariadb_1:mysql \ -v $(pwd)/docker.conf:/app/docker.conf \ -v $(pwd)/data:/tmp/data \ --name sjs -d \ -p 8090:8090 spark-job-server
Если сервер, на котором вы используете SJS доступен извне, необходимо при запуске использовать -p 127.0.0.1:8090:8090 вместо -p 8090:8090, а доступ к сервису предоставлять через Nginx с аутентификацией, например, через базовую аутентификацию HTTP. Так же лучше использовать защищенный протокол HTTPS. В наших руководствах вы сможете найти как это сделать для Ubuntu и Debian, CentOS с помощью бесплатного сертификата Let’s Encrypt.
Запуск приложения Spark в Spark Job Server
Для дальнейшего тестирования SJS загрузим тестовые приложения Spark, которые поставляются в составе SJS:
JOBFILE=$(find ../spark-jobserver/job-server-tests/target/ -name 'job-server-tests*.jar') curl -X POST localhost:8090/binaries/test -H 'Content-Type: application/java-archive' --data-binary @$JOBFILE
Запустим приложение подсчета слов:
curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample" { "duration": "Job not done yet", "classPath": "spark.jobserver.WordCountExample", "startTime": "2019-04-17T14:44:08.435Z", "context": "78c96fd8-222f-4a5a-af24-10171c42065b-spark.jobserver.WordCountExample", "status": "STARTED", "jobId": "8a99ed4e-8a54-4ab0-b61e-fbe2bb05bda8", "contextId": "f2b0dbea-d95f-4190-9b37-a87a055d1acc" }
Теперь можно отслеживать состояние задачи по JobID, как только задача завершится, вы сможете увидеть результат ее выполнения:
curl localhost:8090/jobs/8a99ed4e-8a54-4ab0-b61e-fbe2bb05bda8 { "duration": "1.0 secs", "classPath": "spark.jobserver.WordCountExample", "startTime": "2019-04-17T14:44:08.000Z", "context": "78c96fd8-222f-4a5a-af24-10171c42065b-spark.jobserver.WordCountExample", "result": { "a": 2, "b": 2, "see": 1, "c": 1 }, "status": "FINISHED", "jobId": "8a99ed4e-8a54-4ab0-b61e-fbe2bb05bda8", "contextId": "f2b0dbea-d95f-4190-9b37-a87a055d1acc" }
Этот же результат можно увидеть и через браузер:
Запуск приложения PySpark
Создадим приложение SJS для Spark по инструкции:
mkdir job cat > job/__init__.py from sparkjobserver.api import SparkJob, build_problems class WordCountSparkJob(SparkJob): def validate(self, context, runtime, config): if config.get('input.strings', None): return config.get('input.strings') else: return build_problems(["config 'input.strings' not found"]) def run_job(self, context, runtime, data): return context._sc.parallelize(data).countByValue() Ctrl^D
Скопируем зависимости для взаимодействия с API SJS в проект:
cp -R ../spark-jobserver/job-server-python/src/python/sparkjobserver .
Создадим файл для сборки пакета Python:
cat > setup.py from setuptools import find_packages, setup setup( name='job', packages=['job','sparkjobserver'], include_package_data=True, ) Ctrl^D
Создадим пакет приложения:
python setup.py bdist_egg running bdist_egg running egg_info writing job.egg-info/PKG-INFO writing top-level names to job.egg-info/top_level.txt writing dependency_links to job.egg-info/dependency_links.txt reading manifest file 'job.egg-info/SOURCES.txt' writing manifest file 'job.egg-info/SOURCES.txt' installing library code to build/bdist.linux-x86_64/egg running install_lib running build_py creating build/bdist.linux-x86_64/egg creating build/bdist.linux-x86_64/egg/sparkjobserver copying build/lib.linux-x86_64-2.7/sparkjobserver/subprocess.py -> build/bdist.linux-x86_64/egg/sparkjobserver copying build/lib.linux-x86_64-2.7/sparkjobserver/api.py -> build/bdist.linux-x86_64/egg/sparkjobserver copying build/lib.linux-x86_64-2.7/sparkjobserver/__init__.py -> build/bdist.linux-x86_64/egg/sparkjobserver creating build/bdist.linux-x86_64/egg/job copying build/lib.linux-x86_64-2.7/job/__init__.py -> build/bdist.linux-x86_64/egg/job byte-compiling build/bdist.linux-x86_64/egg/sparkjobserver/subprocess.py to subprocess.pyc byte-compiling build/bdist.linux-x86_64/egg/sparkjobserver/api.py to api.pyc byte-compiling build/bdist.linux-x86_64/egg/sparkjobserver/__init__.py to __init__.pyc byte-compiling build/bdist.linux-x86_64/egg/job/__init__.py to __init__.pyc creating build/bdist.linux-x86_64/egg/EGG-INFO copying job.egg-info/PKG-INFO -> build/bdist.linux-x86_64/egg/EGG-INFO copying job.egg-info/SOURCES.txt -> build/bdist.linux-x86_64/egg/EGG-INFO copying job.egg-info/dependency_links.txt -> build/bdist.linux-x86_64/egg/EGG-INFO copying job.egg-info/top_level.txt -> build/bdist.linux-x86_64/egg/EGG-INFO zip_safe flag not set; analyzing archive contents... creating 'dist/job-0.0.0-py2.7.egg' and adding 'build/bdist.linux-x86_64/egg' to it removing 'build/bdist.linux-x86_64/egg' (and everything under it)
Загрузим приложение PySpark на сервер:
JOBFILE=$(find -name *.egg) curl --data-binary @$JOBFILE -H 'Content-Type: application/python-archive' localhost:8090/binaries/my_py_job
Создадим контекст для исполнения задачи:
curl -X POST "localhost:8090/contexts/py-context?context-factory=spark.jobserver.python.PythonSessionContextFactory" { "status": "SUCCESS", "result": "Context initialized" }
Запустим задачу:
curl-d 'input.strings = ["a", "b", "a", "b" ]' "localhost:8090/jobs?appName=my_py_job&classPath=job.WordCountSparkJob&context=py-context" { "duration": "Job not done yet", "classPath": "job.WordCountSparkJob", "startTime": "2019-04-17T15:17:58.397Z", "context": "py-context", "status": "STARTED", "jobId": "932bc1d3-011e-459b-9618-111832a1ef87", "contextId": "555eb5ba-9951-4917-982b-8b944cc782dc" } curl localhost:8090/jobs/932bc1d3-011e-459b-9618-111832a1ef87 { "duration": "1.0 secs", "classPath": "job.WordCountSparkJob", "startTime": "2019-04-17T15:17:58.000Z", "context": "py-context", "result": { "a": 2, "b": 2 }, "status": "FINISHED", "jobId": "932bc1d3-011e-459b-9618-111832a1ef87", "contextId": "555eb5ba-9951-4917-982b-8b944cc782dc" }
Теперь вы можете использовать SJS для работы со Spark с помощью REST API. Дальнейшие инструкции по работе с PySpark и SJS можно найти в профильной статье. Если вы планируете использовать Scala, вам следует обратиться к примерам для Scala, если Java — к примерам для Java.
Заключение
Spark Job Server — мощный инструмент, который позволяет работать с кластером или локальным вычислителем Spark эффективно. Уникальные возможности использования постоянных контекстов Spark позволяют запускать родственные задачи быстро с уменьшенными накладными расходами.
За дальнейшими инструкциями по использованию SJS мы рекомендуем обратиться к официальной документации.