Роман Корешков, наш руководитель проектов дирекции развития технологий, опубликовал в корпоративном блоге на «Хабрахабре» статью, посвященную индексации логов бизнес-приложений в Hadoop. Как вынести логи из корпоративных приложений? Каким способом можно реализовать возможность поиска и просмотра списка событий? В чем заключаются особенности настроек и сложности в процессе решения задачи с помощью Hadoop? Об этом — в материале «Как проиндексировать логи бизнес-приложений в Hadoop» на сайте.

Содержание

Введение

У одного из наших клиентов возникла задача вынести логи из большинства корпоративных приложений и их баз данных «куда-нибудь» — уж больно с ними много возни: растут как на дрожжах, чисти их периодически, а к некоторым еще и доступ должен быть обеспечен в течение многих лет, да еще и анализ хочется проводить системным образом. Конечно же, вынести логи — это не первичная цель. По совокупности требований мы выбрали Hadoop, версию от Cloudera (CDH 5). Требования указывали, что решение, помимо прочего, должно предоставлять возможность поиска и просмотра списка событий (из логов) по заданным критериям, причем желательно быстрого. Причем некоторые приложения также должны быть переделаны, чтобы формы просмотра логов стали использовать Hadoop вместо своих баз данных. Как одно из решений — использовать поисковый модуль SolrCloud, который входит в комплект Hadoop от Cloudera. В Cloudera «из коробки» входят тулзы для выгрузки данных из баз данных приложений и их индексации пачкой (не построчно). Однако такой способ оказался хоть и рабочим, но более трудоемким и непредсказуемым в настройке, чем, скажем, если бы мы использовали Impala для выборки данных. Эта статья описывает детали настройки, а также встреченные в процессе работы особенности. Сценарий такой:

  1. Выгружаем данные из Oracle в файлы на HDFS. Формат файла — avro. Инструмент: sqoop.
    Формат avro имеет много плюсов: он бинарный, данные хорошо сжимаются, с ним вы не будете париться с переводами каретки и с запятыми в текстовых полях, как с CSV, и в самом файле есть схема данных, поддерживает Schema Evolution. Вообще, в Hadoop’е avro пропагандируется как унифицированный формат хранения и передачи данных между разными компонентами, его поддерживают многие инструменты и компоненты. И есть еще один плюс именно для нашей задачи, об этом ниже.
  2. Создаем «коллекцию» в SolrCloud. Инструмент: solrctl.
    Коллекция — полный логический индекс в SolrCloud. Она связана с набором конфигурационных файлов и состоит из одного или нескольких шардов (shard). Если количество шардов более одного — это распределенный индекс.
  3. Запускаем MapReduce-драйвер, который:
    • прочитает все записи из avro-файла;
    • пропустит их через ETL-процесс, написанный в виде morphline-скрипта; результат этого процесса — шард с новыми данными (индекс файлы в формате Solr’а, выложенные в указанную директорию HDFS);
    • сольет (merge) выложенный шард в коллекцию активного SolrCloud без перевода его в offline, по-живому (go-live), так сказать.

Инструмент: команда hadoop, запускающая драйвер org.apache.solr.hadoop.MapReduceIndexerTool, выполняющий сию последовательность.

Все запускаем с основного NameNode’а, хотя это не принципиально. Итак, по шагам...

Выгружаем данные из Oracle в avro файлы

sqoop import --connect jdbc:oracle:thin:@oraclehost:1521/SERVICENAME \
 
--username ausername --password apassword --table ASCHEMA.LOG_TABLE \
 
--as-avrodatafile --compression-codec snappy \
 
-m 16 --split-by NUM_BEG  \
 
--map-column-java NUM_BEG=Integer,DTM_BEG=String,KEY_TYPE=String,OLD_VALUE=String,NEW_VALUE=String,NUM_PARENT=Integer,\
 
NUM_END=Integer,EVENT=String,TRACELEVEL=String,KEY_USER=String,COMPUTER_NAME=String,PRM=String,OPERATION=Integer,\
 
KEY_ENTITY=String,MODULE_NAME=String \
 
--target-dir /user/$USER/solrindir/tmlogavro

Немного о параметрах:

Создаем коллекцию

Здесь пользуемся утилитой solrctl для управления развернутым SolrCloud.

Сначала на локальном диске генерируем структуру файлов будущей коллекции, так называемую Collection Instance Directory. В ней мы будем делать/изменять настройки коллекции на локальном диске и затем клонировать их в сервис конфигурации zookeeper, из которого SolrCloud читает необходимые для работы настройки:

solrctl instancedir --generate $HOME/solr_configs_for_tm_log

Здесь параметром является путь до создаваемой локальной директории.

По умолчанию созданные в директории файлы уже наполнены демо-настройкой схемы данных и инструкций к поиску, и нам необходимо удалить лишнее.

Открываем файл conf/schema.xml в созданной директории. Это основной файл коллекции, описывающий структуру индексируемых данных. Удаляем тэг <fields> со своим содержимым, тэг <uniquekey> и все <copyField>. Вместо этого вставляем следующее:

<fields>
        <field name="num_beg" type="int" indexed="true" stored="true" multiValued="false" />
 
        <field name="dtm_beg" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_type" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="old_value" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="new_value" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="num_parent" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="num_end" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="event" type="text_general" indexed="true" stored="true" multiValued="false" />
 
        <field name="tracelevel" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_user" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="computer_name" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="prm" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="operation" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="key_entity" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="module_name" type="string" indexed="true" stored="true" multiValued="false" />
 
        <field name="_version_" type="long" indexed="true" stored="true" required="true" />    
 
        <!-- catchall field, containing all other searchable text fields (implemented
 
            via copyField further on in this schema  -->
 
        <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/>
 
    </fields>
 
 <!-- Field to use to determine and enforce document uniqueness.
 
      Unless this field is marked with required="false", it will be a required field
 
   -->
 
    <uniqueKey>num_beg</uniqueKey>
 
    <copyField source="event" dest="text"/> 

Обратите внимание, что поле _version_ не существует в источнике данных, оно необходимо для внутренних целей Solr’а, например для оптимистичных блокировок, для механизма Partial Update. Просто указать такое поле в вашем schema.xml будет достаточно: Solr сам будет управлять его содержимым.

Также нет и поля text. Мы его указали вместе с инструкцией copyField для работоспособности полнотекстового поиска через HUE (пользовательский интерфейс к Hadoop’у от Clouderа). Если вы подключите созданную коллекцию к HUE (через настроечные UI-формы), то в интерфейсе поиска по этой коллекции значение строки поиска происходит по полю text.

Теперь одно приседание. Дело в том, что в нагенерированных файлах-примерах включен один механизм поискового движка — Elevator. Он позволяет выставлять вперед результаты по определенным критериям, вроде рекламных объявлений сверху в результатах поиска в Yandex. Так вот в примере он настроен на то, что ключевое поле в вашей схеме — типа string (примеры рекламных фраз можно посмотреть в conf\elevate.xml). У нас же стоит int. Из-за этого весь наш процесс индексирования обваливался с ошибкой о несоответствии типов. Учитывая, что этот механизм неинтересен для нашей задачи, вырезаем его, а именно: открываем файл conf/solrconfig.xml в созданной директории, и удаляем (комментируем) теги и их содержимое <searchComponent name="elevator" ...">, <requestHandler name="/elevate" ...>. А заодно удаляем файл conf\elevate.xml из созданной директории, чтоб не болтался под ногами.

Затем регистрируем (клонируем) всю конфигурацию будущей коллекции в SolrCloud, а точнее, в naming service ZooKeeper, из которого все развернутые сервера SolrCloud’а читают конфигурации (и получают их обновления):

solrctl instancedir--create tm_log_avro $HOME/solr_configs_for_tm_log

Здесь параметрами являются имя будущей коллекции и путь к директории на локальном диске, содержащей файлы конфигурации. Мы её создали выше.

Ну и последний шаг на этом этапе — создание коллекции с заданным количеством шардов (равное количеству SolrCloud-серверов):

solrctl collection--create tm_log_avro -s 1

Эта команда создает коллекцию на основе зарегистрированной в ZooKeeper конфигурации. Первый параметр — имя коллекции, второй — количество шардов (возьмем для простоты 1).

Запуск процесса индексации коллекции

Сначала настроим ETL-процесс индексации. Cloudera с уважением относится к библиотеке Kite SDK, особенно к её части Morphline. По сути, компонент Morphline — это интерпретатор скриптового языка, в котором вы описываете (в виде иерархии последовательностей команд), что нужно делать с входящим потоком данных (в виде массива объектов-«записей»), как преобразовать, и что отдать дальше. Например, есть команда для чтения avro-файла. Конечно же, свои команды подключаются, в этом и фишка. Вот и Clouderа написала команды для создания Solr-индекса по всем записям входящего потока, он будет последний в скрипте.

Суть процесса:

Для настройки такого процесса создадим файл $HOME/solr_configs_for_tm_log_morphlines/morphlines.conf со следующим наполнением:

 
 
# Specify server locations in a SOLR_LOCATOR variable; used later in
 
# variable substitutions:
 
SOLR_LOCATOR : {
 
  # Name of solr collection
 
  collection :  tm_log_avro
 
 
 
  # ZooKeeper ensemble
 
  zkHost : "hadoop-n1.custis.ru:2181,hadoop-n2.custis.ru:2181,hadoop-n3.custis.ru:2181/solr" 
 
}
 
 
 
# Specify an array of one or more morphlines, each of which defines an ETL
 
# transformation chain. A morphline consists of one or more potentially
 
# nested commands. A morphline is a way to consume records such as Flume events,
 
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
 
# of records through a set of easily configurable transformations on its way to
 
# Solr.
 
morphlines : [
 
  {
 
    # Name used to identify a morphline. For example, used if there are multiple
 
    # morphlines in a morphline config file.
 
    id : morphline1
 
 
 
    # Import all morphline commands in these java packages and their subpackages.
 
    # Other commands that may be present on the classpath are not visible to this
 
    # morphline.
 
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
 
 
 
    commands : [                   
 
      {
 
        # Parse Avro container file and emit a record for each Avro object
 
        readAvroContainer {
 
          # Optionally, require the input to match one of these MIME types:
 
          # supportedMimeTypes : [avro/binary]
 
 
 
          # Optionally, use a custom Avro schema in JSON format inline:
 
          # readerSchemaString : """<json can go here>"""
 
 
 
          # Optionally, use a custom Avro schema file in JSON format:
 
          # readerSchemaFile : /path/to/syslog.avsc
 
        }
 
      }
 
 
 
      {
 
        # Consume the output record of the previous command and pipe another
 
        # record downstream.
 
        #
 
        # extractAvroPaths is a command that uses zero or more Avro path
 
        # excodeblockssions to extract values from an Avro object. Each excodeblockssion
 
        # consists of a record output field name, which appears to the left of the
 
        # colon ':' and zero or more path steps, which appear to the right.
 
        # Each path step is separated by a '/' slash. Avro arrays are
 
        # traversed with the '[]' notation.
 
        #
 
        # The result of a path excodeblockssion is a list of objects, each of which
 
        # is added to the given record output field.
 
        #
 
        # The path language supports all Avro concepts, including nested
 
        # structures, records, arrays, maps, unions, and others, as well as a flatten
 
        # option that collects the primitives in a subtree into a flat list. In the
 
        # paths specification, entries on the left of the colon are the target Solr
 
        # field and entries on the right specify the Avro source paths. Paths are read
 
        # from the source that is named to the right of the colon and written to the
 
        # field that is named on the left.
 
        extractAvroPaths {
 
          flatten : true
 
          paths : {
 
            computer_name :/COMPUTER_NAME
 
            dtm_beg :/DTM_BEG
 
            event :/EVENT
 
            key_entity :/KEY_ENTITY
 
            key_type :/KEY_TYPE
 
            key_user :/KEY_USER
 
            module_name :/MODULE_NAME
 
            new_value :/NEW_VALUE
 
            num_beg :/NUM_BEG
 
            num_end :/NUM_END
 
            num_parent :/NUM_PARENT
 
            old_value :/OLD_VALUE
 
            operation :/OPERATION
 
            prm :/PRM
 
            tracelevel :/TRACELEVEL
 
          }
 
        }
 
      }
 
 
 
      # Consume the output record of the previous command and pipe another
 
      # record downstream.
 
      #
 
      # convert timestamp field to native Solr timestamp format
 
      # such as 2012-09-06 07:14:34 to 2012-09-06T07:14:34.000Z in UTC
 
      {
 
        convertTimestamp {
 
          field : dtm_beg
 
          inputFormats : ["yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"]
 
          inputTimezone : Europe/Moscow
 
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"                                
 
          outputTimezone : UTC
 
        }
 
      }
 
 
 
      # Consume the output record of the previous command and pipe another
 
      # record downstream.
 
      #
 
      # This command deletes record fields that are unknown to Solr
 
      # schema.xml.
 
      #
 
      # Recall that Solr throws an exception on any attempt to load a document
 
      # that contains a field that is not specified in schema.xml.
 
      {
 
        sanitizeUnknownSolrFields {
 
          # Location from which to fetch Solr schema
 
          solrLocator : ${SOLR_LOCATOR}
 
        }
 
      } 
 
 
 
      # log the record at DEBUG level to SLF4J
 
      { logDebug { format : "output record: {}", args : ["@{}"] } }   
 
 
 
      # load the record into a Solr server or MapReduce Reducer
 
      {
 
        loadSolr {
 
          solrLocator : ${SOLR_LOCATOR}
 
        }
 
      }
 
    ]
 
  }
 
]

Немного про использованные команды:

Запуск

Теперь все готово для запуска. Запускаем вместе две команды:

 
sudo -u hdfs hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool -find \
 
hdfs://$NNHOST:8020/user/$USER/solrindir/tmlogavro -type f \
 
-name 'part-m-000*.avro' |\
 
sudo -u hdfs hadoop --config /etc/hadoop/conf.cloudera.yarn \
 
jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
 
--libjars /usr/lib/solr/contrib/mr/search-mr-1.0.0-cdh5.0.0.jar \
 
--log4j $HOME/solr_configs_for_tm_log_morphlines/log4j.properties \
 
--morphline-file $USER/solr_configs_for_tm_log_morphlines/morphlines.conf \
 
--output-dir hdfs://$NNHOST:8020/user/$USER/solroutdir \
 
--verbose --go-live --zk-host $ZKHOST \
 
--collection tm_log_avro \
 
--input-list -;

Немного про параметры второй команды:

Эта команда создаст и запустит MapReduce-задачи.

Немного итогов

MapReduceIndexerTool, да и сам Solr, оказался очень капризен по поводу доступной оперативной памяти. С нашими структурами каждая задача Reduce, индексирующая файл из списка, требовала, чтобы ей было доступно оперативной памяти (Java Heap Size) в объеме примерно 1/2 от размера несжатого файла, иначе — OutOfMemoryError. Поэтому при выгрузке sqoop’ом в файлы управляйте их размером через, например, параметр m (количество mapper’ов, создающих файлы).

Также, несмотря на количество доступной памяти в задачах Map и Reduce, успешность прохождения последнего этапа напрямую зависит от количества доступной памяти в Solr Server’е и от размера уже проиндексированных данных в коллекции. По нашим структурам, например, для merge’а 30 Гб на один шард хватало 6 Гб Java Heap Size, выделенной на один Solr instance.

Есть еще одна особенность — используемый механизм merge’а индексов никак не идентифицирует дубли записей. Если в вашем индексируемом файле есть записи, которые уже есть в коллекции, — они задублируются. Поэтому при повторной индексации следите, чтобы каждый раз получать в файлах уникальный набор записей. Это можно вполне легко устроить, воспользовавшись фичей sqoop’а по инкрементальной выгрузке данных (через sqoop job). Только не забудьте перед запуском выгрузки убирать старые файлы из папки, а то ведь еще раз проиндексируются.