Персональные инструменты
 

Как проиндексировать логи бизнес-приложений в Hadoop (SolrCloud)

Материал из CustisWiki

Версия от 12:37, 12 мая 2015; KseniyaKirillova (обсуждение)

(разн.) ← Предыдущая | Текущая версия (разн.) | Следующая → (разн.)
Перейти к: навигация, поиск
Роман Корешков, наш руководитель проектов дирекции развития технологий, опубликовал в корпоративном блоге на «Хабрахабре» статью, посвященную индексации логов бизнес-приложений в Hadoop. Как вынести логи из корпоративных приложений? Каким способом можно реализовать возможность поиска и просмотра списка событий? В чем заключаются особенности настроек и сложности в процессе решения задачи с помощью Hadoop? Об этом — в материале «Как проиндексировать логи бизнес-приложений в Hadoop» на сайте.

Введение

У одного из наших клиентов возникла задача вынести логи из большинства корпоративных приложений и их баз данных «куда-нибудь» — уж больно с ними много возни: растут как на дрожжах, чисти их периодически, а к некоторым еще и доступ должен быть обеспечен в течение многих лет, да еще и анализ хочется проводить системным образом. Конечно же, вынести логи — это не первичная цель. По совокупности требований мы выбрали Hadoop, версию от Cloudera (CDH 5). Требования указывали, что решение, помимо прочего, должно предоставлять возможность поиска и просмотра списка событий (из логов) по заданным критериям, причем желательно быстрого. Причем некоторые приложения также должны быть переделаны, чтобы формы просмотра логов стали использовать Hadoop вместо своих баз данных. Как одно из решений — использовать поисковый модуль SolrCloud, который входит в комплект Hadoop от Cloudera. В Cloudera «из коробки» входят тулзы для выгрузки данных из баз данных приложений и их индексации пачкой (не построчно). Однако такой способ оказался хоть и рабочим, но более трудоемким и непредсказуемым в настройке, чем, скажем, если бы мы использовали Impala для выборки данных. Эта статья описывает детали настройки, а также встреченные в процессе работы особенности. Сценарий такой:

  1. Выгружаем данные из Oracle в файлы на HDFS. Формат файла — avro. Инструмент: sqoop.
    Формат avro имеет много плюсов: он бинарный, данные хорошо сжимаются, с ним вы не будете париться с переводами каретки и с запятыми в текстовых полях, как с CSV, и в самом файле есть схема данных, поддерживает Schema Evolution. Вообще, в Hadoop’е avro пропагандируется как унифицированный формат хранения и передачи данных между разными компонентами, его поддерживают многие инструменты и компоненты. И есть еще один плюс именно для нашей задачи, об этом ниже.
  2. Создаем «коллекцию» в SolrCloud. Инструмент: solrctl.
    Коллекция — полный логический индекс в SolrCloud. Она связана с набором конфигурационных файлов и состоит из одного или нескольких шардов (shard). Если количество шардов более одного — это распределенный индекс.
  3. Запускаем MapReduce-драйвер, который:
    • прочитает все записи из avro-файла;
    • пропустит их через ETL-процесс, написанный в виде morphline-скрипта; результат этого процесса — шард с новыми данными (индекс файлы в формате Solr’а, выложенные в указанную директорию HDFS);
    • сольет (merge) выложенный шард в коллекцию активного SolrCloud без перевода его в offline, по-живому (go-live), так сказать.

Инструмент: команда hadoop, запускающая драйвер org.apache.solr.hadoop.MapReduceIndexerTool, выполняющий сию последовательность.

Все запускаем с основного NameNode’а, хотя это не принципиально. Итак, по шагам...

Выгружаем данные из Oracle в avro файлы

sqoop import --connect jdbc:oracle:thin:@oraclehost:1521/SERVICENAME \
 
--username ausername --password apassword --table ASCHEMA.LOG_TABLE \
 
--as-avrodatafile --compression-codec snappy \
 
-m 16 --split-by NUM_BEG  \
 
--map-column-java NUM_BEG=Integer,DTM_BEG=String,KEY_TYPE=String,OLD_VALUE=String,NEW_VALUE=String,NUM_PARENT=Integer,\
 
NUM_END=Integer,EVENT=String,TRACELEVEL=String,KEY_USER=String,COMPUTER_NAME=String,PRM=String,OPERATION=Integer,\
 
KEY_ENTITY=String,MODULE_NAME=String \
 
--target-dir /user/$USER/solrindir/tmlogavro

Немного о параметрах:

  • connect — строка подключения к базе одного из приложений на Oracle;
  • as-avrodatafile и compression-codec указывают на то, что выгружаться данные будут в файл(ы) формата avro с указанной компрессией, которая в среднем сжимает данные нашей структуры в 10 раз;
  • -m определяет, сколько map-задач (tasks) будут выгружать данные из таблицы. Несколько задач запускаются параллельно. Каждая задача берет свое подмножество записей из таблицы и сохраняет в отдельный файл. Чтобы определить все подмножество, sqoop берет select min(<split-by>), max(<split-by>);
  • from делит полученный диапазон чисел на 16 частей (в нашем примере), и теперь каждая задача будет использовать получившийся поддиапазон чисел как фильтры в SQL-запрос для выбора нужного ей подмножества записей таблицы. По умолчанию split-by берется как первая колонка в Pk таблицы;
  • map-column-java — указание типов колонок в понятиях Sqoop’а. В принципе, Sqoop умеет переваривать большинство Oracle’овых типов колонок, но иногда вы вынуждены подсказывать ему в этом параметре;
  • target-dir — директория в HDFS, в которую нужно сохранять файлы.

Создаем коллекцию

Здесь пользуемся утилитой solrctl для управления развернутым SolrCloud.

Сначала на локальном диске генерируем структуру файлов будущей коллекции, так называемую Collection Instance Directory. В ней мы будем делать/изменять настройки коллекции на локальном диске и затем клонировать их в сервис конфигурации zookeeper, из которого SolrCloud читает необходимые для работы настройки:

solrctl instancedir --generate $HOME/solr_configs_for_tm_log

Здесь параметром является путь до создаваемой локальной директории.

По умолчанию созданные в директории файлы уже наполнены демо-настройкой схемы данных и инструкций к поиску, и нам необходимо удалить лишнее.

Открываем файл conf/schema.xml в созданной директории. Это основной файл коллекции, описывающий структуру индексируемых данных. Удаляем тэг <fields> со своим содержимым, тэг <uniquekey> и все <copyField>. Вместо этого вставляем следующее:

<fields>
        <field name="num_beg" type="int" indexed="true" stored="true" multiValued="false" />
 
        <field name="dtm_beg" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_type" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="old_value" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="new_value" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="num_parent" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="num_end" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="event" type="text_general" indexed="true" stored="true" multiValued="false" />
 
        <field name="tracelevel" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_user" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="computer_name" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="prm" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="operation" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_entity" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="module_name" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="_version_" type="long" indexed="true" stored="true" required="true" />    
 
        <!-- catchall field, containing all other searchable text fields (implemented
 
            via copyField further on in this schema  -->
 
        <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/>
 
    </fields>
 
 <!-- Field to use to determine and enforce document uniqueness.
 
      Unless this field is marked with required="false", it will be a required field
 
   -->
 
    <uniqueKey>num_beg</uniqueKey>
 
    <copyField source="event" dest="text"/> 

Обратите внимание, что поле _version_ не существует в источнике данных, оно необходимо для внутренних целей Solr’а, например для оптимистичных блокировок, для механизма Partial Update. Просто указать такое поле в вашем schema.xml будет достаточно: Solr сам будет управлять его содержимым.

Также нет и поля text. Мы его указали вместе с инструкцией copyField для работоспособности полнотекстового поиска через HUE (пользовательский интерфейс к Hadoop’у от Clouderа). Если вы подключите созданную коллекцию к HUE (через настроечные UI-формы), то в интерфейсе поиска по этой коллекции значение строки поиска происходит по полю text.

Теперь одно приседание. Дело в том, что в нагенерированных файлах-примерах включен один механизм поискового движка — Elevator. Он позволяет выставлять вперед результаты по определенным критериям, вроде рекламных объявлений сверху в результатах поиска в Yandex. Так вот в примере он настроен на то, что ключевое поле в вашей схеме — типа string (примеры рекламных фраз можно посмотреть в conf\elevate.xml). У нас же стоит int. Из-за этого весь наш процесс индексирования обваливался с ошибкой о несоответствии типов. Учитывая, что этот механизм неинтересен для нашей задачи, вырезаем его, а именно: открываем файл conf/solrconfig.xml в созданной директории, и удаляем (комментируем) теги и их содержимое <searchComponent name="elevator" ...">, <requestHandler name="/elevate" ...>. А заодно удаляем файл conf\elevate.xml из созданной директории, чтоб не болтался под ногами.

Затем регистрируем (клонируем) всю конфигурацию будущей коллекции в SolrCloud, а точнее, в naming service ZooKeeper, из которого все развернутые сервера SolrCloud’а читают конфигурации (и получают их обновления):

solrctl instancedir--create tm_log_avro $HOME/solr_configs_for_tm_log

Здесь параметрами являются имя будущей коллекции и путь к директории на локальном диске, содержащей файлы конфигурации. Мы её создали выше.

Ну и последний шаг на этом этапе — создание коллекции с заданным количеством шардов (равное количеству SolrCloud-серверов):

solrctl collection--create tm_log_avro -s 1

Эта команда создает коллекцию на основе зарегистрированной в ZooKeeper конфигурации. Первый параметр — имя коллекции, второй — количество шардов (возьмем для простоты 1).

Запуск процесса индексации коллекции

Сначала настроим ETL-процесс индексации. Cloudera с уважением относится к библиотеке Kite SDK, особенно к её части Morphline. По сути, компонент Morphline — это интерпретатор скриптового языка, в котором вы описываете (в виде иерархии последовательностей команд), что нужно делать с входящим потоком данных (в виде массива объектов-«записей»), как преобразовать, и что отдать дальше. Например, есть команда для чтения avro-файла. Конечно же, свои команды подключаются, в этом и фишка. Вот и Clouderа написала команды для создания Solr-индекса по всем записям входящего потока, он будет последний в скрипте.

Суть процесса:

  • на вход приходит объект-«запись» с информацией о файле;
  • запускается команда, читающая этот файл и отдающая строки этого файла в виде массива объектов-«записей»;
  • данные каждой строки преобразуются, как вам удобно (например, значение поля с датой-временем преобразуется из UTC в региональное время);
  • каждая строка преобразуется в Solr-документ и весь массив возвращается из MapReduce Mapper-а.

Для настройки такого процесса создадим файл $HOME/solr_configs_for_tm_log_morphlines/morphlines.conf со следующим наполнением:

 
 
# Specify server locations in a SOLR_LOCATOR variable; used later in
 
# variable substitutions:
 
SOLR_LOCATOR : {
 
  # Name of solr collection
 
  collection :  tm_log_avro
 
 
 
  # ZooKeeper ensemble
 
  zkHost : "hadoop-n1.custis.ru:2181,hadoop-n2.custis.ru:2181,hadoop-n3.custis.ru:2181/solr" 
 
}
 
 
 
# Specify an array of one or more morphlines, each of which defines an ETL
 
# transformation chain. A morphline consists of one or more potentially
 
# nested commands. A morphline is a way to consume records such as Flume events,
 
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
 
# of records through a set of easily configurable transformations on its way to
 
# Solr.
 
morphlines : [
 
  {
 
    # Name used to identify a morphline. For example, used if there are multiple
 
    # morphlines in a morphline config file.
 
    id : morphline1
 
 
 
    # Import all morphline commands in these java packages and their subpackages.
 
    # Other commands that may be present on the classpath are not visible to this
 
    # morphline.
 
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
 
 
 
    commands : [                   
 
      {
 
        # Parse Avro container file and emit a record for each Avro object
 
        readAvroContainer {
 
          # Optionally, require the input to match one of these MIME types:
 
          # supportedMimeTypes : [avro/binary]
 
 
 
          # Optionally, use a custom Avro schema in JSON format inline:
 
          # readerSchemaString : """<json can go here>"""
 
 
 
          # Optionally, use a custom Avro schema file in JSON format:
 
          # readerSchemaFile : /path/to/syslog.avsc
 
        }
 
      }
 
 
 
      {
 
        # Consume the output record of the previous command and pipe another
 
        # record downstream.
 
        #
 
        # extractAvroPaths is a command that uses zero or more Avro path
 
        # excodeblockssions to extract values from an Avro object. Each excodeblockssion
 
        # consists of a record output field name, which appears to the left of the
 
        # colon ':' and zero or more path steps, which appear to the right.
 
        # Each path step is separated by a '/' slash. Avro arrays are
 
        # traversed with the '[]' notation.
 
        #
 
        # The result of a path excodeblockssion is a list of objects, each of which
 
        # is added to the given record output field.
 
        #
 
        # The path language supports all Avro concepts, including nested
 
        # structures, records, arrays, maps, unions, and others, as well as a flatten
 
        # option that collects the primitives in a subtree into a flat list. In the
 
        # paths specification, entries on the left of the colon are the target Solr
 
        # field and entries on the right specify the Avro source paths. Paths are read
 
        # from the source that is named to the right of the colon and written to the
 
        # field that is named on the left.
 
        extractAvroPaths {
 
          flatten : true
 
          paths : {
 
            computer_name :/COMPUTER_NAME
 
            dtm_beg :/DTM_BEG
 
            event :/EVENT
 
            key_entity :/KEY_ENTITY
 
            key_type :/KEY_TYPE
 
            key_user :/KEY_USER
 
            module_name :/MODULE_NAME
 
            new_value :/NEW_VALUE
 
            num_beg :/NUM_BEG
 
            num_end :/NUM_END
 
            num_parent :/NUM_PARENT
 
            old_value :/OLD_VALUE
 
            operation :/OPERATION
 
            prm :/PRM
 
            tracelevel :/TRACELEVEL
 
          }
 
        }
 
      }
 
 
 
      # Consume the output record of the previous command and pipe another
 
      # record downstream.
 
      #
 
      # convert timestamp field to native Solr timestamp format
 
      # such as 2012-09-06 07:14:34 to 2012-09-06T07:14:34.000Z in UTC
 
      {
 
        convertTimestamp {
 
          field : dtm_beg
 
          inputFormats : ["yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"]
 
          inputTimezone : Europe/Moscow
 
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"                                
 
          outputTimezone : UTC
 
        }
 
      }
 
 
 
      # Consume the output record of the previous command and pipe another
 
      # record downstream.
 
      #
 
      # This command deletes record fields that are unknown to Solr
 
      # schema.xml.
 
      #
 
      # Recall that Solr throws an exception on any attempt to load a document
 
      # that contains a field that is not specified in schema.xml.
 
      {
 
        sanitizeUnknownSolrFields {
 
          # Location from which to fetch Solr schema
 
          solrLocator : ${SOLR_LOCATOR}
 
        }
 
      } 
 
 
 
      # log the record at DEBUG level to SLF4J
 
      { logDebug { format : "output record: {}", args : ["@{}"] } }   
 
 
 
      # load the record into a Solr server or MapReduce Reducer
 
      {
 
        loadSolr {
 
          solrLocator : ${SOLR_LOCATOR}
 
        }
 
      }
 
    ]
 
  }
 
]

Немного про использованные команды:

  • readAvroContainer — вот тут-то нам и пригодится формат avro: в самом файле есть вся метаинформация о структуре данных, которая необходима, чтобы сформировать поток объектов-записей и пустить его по командам дальше. Если бы мы взяли, к примеру, CSV, то нам пришлось бы еще раз здесь описать имя каждого поля, его тип, длину, позицию в файле... Сейчас же эта информация автоматически сформирована на первом шаге при выгрузке из Oracle через Sqoop;
  • extractAvroPaths — указывает, какие поля из каждой входящей записи брать и в какие поля исходящей записи класть. Мы здесь указываем те имена полей, о которых «знает» наша коллекция в SolrCloud. Именно их будет передавать в индексацию последняя команда;
  • convertTimestamp — вызываясь для каждой входящей записи, преобразует строковое поле в дату-время в формате UTC;
  • loadSolr — преобразует объект-«запись» в Solr-документ. Массив этих документов будет затем передан в MapReduce Reducer, который займется непосредственно их индексированием.

Запуск

Теперь все готово для запуска. Запускаем вместе две команды:

  • org.apache.solr.hadoop.HdfsFindTool — это, по сути, реализация части linux-команды find (почему-то в hdfs до сих пор не реализована такая команда, хотя баги стоят уже давно). Результат этой команды (список) передается во вторую;
  • MapReduce-драйвер org.apache.solr.hadoop.MapReduceIndexerTool с кучкой параметров:
 
sudo -u hdfs hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool -find \
 
hdfs://$NNHOST:8020/user/$USER/solrindir/tmlogavro -type f \
 
-name 'part-m-000*.avro' |\
 
sudo -u hdfs hadoop --config /etc/hadoop/conf.cloudera.yarn \
 
jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
 
--libjars /usr/lib/solr/contrib/mr/search-mr-1.0.0-cdh5.0.0.jar \
 
--log4j $HOME/solr_configs_for_tm_log_morphlines/log4j.properties \
 
--morphline-file $USER/solr_configs_for_tm_log_morphlines/morphlines.conf \
 
--output-dir hdfs://$NNHOST:8020/user/$USER/solroutdir \
 
--verbose --go-live --zk-host $ZKHOST \
 
--collection tm_log_avro \
 
--input-list -;

Немного про параметры второй команды:

  • jar — это путь до jar’ника с драйвером; путь — стандартный в поставке от Cloudera;
  • org.apache.solr.hadoop.MapReduceIndexerTool — непосредственно имя класса-драйвера в jar’нике;
  • libjars — дополнительно подключаемые библиотеки;
  • log4j — путь до log4j конфигурационного файла, можно взять стандартный в /usr/lib/hadoop-yarn/etc/hadoop;
  • morphline-file — путь до скрипт-файла Morphline, созданного выше;
  • output-dir — имя директории в HDFS, куда будут сохранены все индексы до слива (merge) в сервера SolrCloud;
  • input-list — список файлов для индексации. Тире после параметра означает, что программа возьмет список из standard input;
  • переменная $ZKHOST настроена у нас в hadoop-n1.custis.ru:2181,hadoop-n2.custis.ru:2181,hadoop-n3.custis.ru:2181/solr.

Эта команда создаст и запустит MapReduce-задачи.

  • Задача Map берет свой файл, пропускает его через Morphline ETL, превратив входящие записи логов в документы Solr, и передает их в следующую задачу. Будет столько экземпляров задач, сколько есть файлов.
  • Задача Reduce берет входные документы и индексирует их в отдельную директорию на диске (в поддиректорию <output-dir>). Экземпляров столько же.
  • Так называемая задача Reduce-Only берет все индексы из папок и заливает их (merge) в SolrCloud. Экземпляров задачи столько, сколько есть шардов у вас в коллекции. В нашем случае — 1.

Немного итогов

MapReduceIndexerTool, да и сам Solr, оказался очень капризен по поводу доступной оперативной памяти. С нашими структурами каждая задача Reduce, индексирующая файл из списка, требовала, чтобы ей было доступно оперативной памяти (Java Heap Size) в объеме примерно 1/2 от размера несжатого файла, иначе — OutOfMemoryError. Поэтому при выгрузке sqoop’ом в файлы управляйте их размером через, например, параметр m (количество mapper’ов, создающих файлы).

Также, несмотря на количество доступной памяти в задачах Map и Reduce, успешность прохождения последнего этапа напрямую зависит от количества доступной памяти в Solr Server’е и от размера уже проиндексированных данных в коллекции. По нашим структурам, например, для merge’а 30 Гб на один шард хватало 6 Гб Java Heap Size, выделенной на один Solr instance.

Есть еще одна особенность — используемый механизм merge’а индексов никак не идентифицирует дубли записей. Если в вашем индексируемом файле есть записи, которые уже есть в коллекции, — они задублируются. Поэтому при повторной индексации следите, чтобы каждый раз получать в файлах уникальный набор записей. Это можно вполне легко устроить, воспользовавшись фичей sqoop’а по инкрементальной выгрузке данных (через sqoop job). Только не забудьте перед запуском выгрузки убирать старые файлы из папки, а то ведь еще раз проиндексируются.