Коллектор sFlow на Logstash
В данном посте будет описана процедура установки коллектора sFlow на базе ELK на CentOS 7 для сбора трафика со свитча Juniper.
Компоненты:
Logstash
— система принимающая данные, фильтрующая их и передающая их в Elasticsearch.Elasticsearch
— база данных, которая хранит информацию и осуществляет по ней поиск.Kibana
— служит для визуального представления данных, хранящихся в Elasticsearch.
Взаимодействие компонентов выглядит так:
10.10.10.10
- адрес сервера CentOS, где установлен ELK
Установка Java JDK
Для работы Elasticsearch рекомендуется Java версии 8.
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u111-b14/jdk-8u111-linux-x64.rpm
rpm -Uhv jdk-8u111-linux-x64.rpm
Проверяем версию
java -version
Установка Elacticsearch
rpm --import http://packages.elastic.co/GPG-KEY-elasticsearch
nano /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-5.x] name=Elasticsearch repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install elasticsearch
systemctl daemon-reload
systemctl start elasticsearch.service
systemctl enable elasticsearch
Проверяем статус сервиса
systemctl status elasticsearch
Создаем сервис для FirewallD:
cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/elasticsearch.xml
nano /etc/firewalld/services/elasticsearch.xml
<service>
<short>Elasticsearch</short>
<description>Elasticsearch</description>
<port protocol="tcp" port="9200"/>
</service>
Открываем доступ в сетевом экране:
firewall-cmd --permanent --zone=public --add-service=elasticsearch
firewall-cmd --reload
Проверяем сетевую доступность Elasticsearch
ss -naut | grep :9200
curl -X GET http://localhost:9200
curl -X GET 'http://localhost:9200/_nodes?pretty'
Изменяем настройки
nano /etc/elasticsearch/elasticsearch.yml
cluster.name: sflowcluster
node.name: "sflowcollector"
network.host: localhost
Применяем изменения
systemctl restart elasticsearch
Установка Logstash
nano /etc/yum.repos.d/logstash.repo
[logstash-5.x] name=Logstash repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install logstash
systemctl daemon-reload
systemctl start logstash
systemctl enable logstash
Проверяем статус сервиса
systemctl status logstash
Установка Kibana
nano /etc/yum.repos.d/kibana.repo
[kibana-5.x] name=Kibana repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
yum install kibana.x86_64
systemctl daemon-reload
systemctl start kibana
systemctl enable kibana
Проверяем статус сервиса
systemctl status kibana
Проверяем автозагрузку всех сервисов
systemctl list-unit-files --type=service | grep -E 'elasticsearch|kibana|logstash'
Проверяем сетевую доступность Kibana
ss -naut | grep :5601
Добавляем в сетевой фильтр
cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/kibana.xml
nano /etc/firewalld/services/kibana.xml
<service>
<short>Kibana</short>
<description>Kibana</description>
<port protocol="tcp" port="5601"/>
</service>
Открываем доступ к Kibana через сетевой фильтр
firewall-cmd --permanent --zone=public --add-service=kibana
firewall-cmd --reload
Изменяем настройки
nano /etc/kibana/kibana.yml
server.host: 10.10.10.10
server.name: "sflowcollector"
Применяем изменения
systemctl restart kibana
Настройка Logstash
cp /usr/lib/firewalld/services/ssh.xml /etc/firewalld/services/sflow.xml
nano /etc/firewalld/services/sflow.xml
<service>
<short>sflow</short>
<description>sflow</description>
<port protocol="udp" port="6343"/>
</service>
Открываем доступ к sflow через сетевой фильтр
firewall-cmd --permanent --zone=public --add-service=sflow
firewall-cmd --reload
Проверяем что все добавленные сервисы открыты в сетевом фильтре
firewall-cmd --list-all
Устанавливаем logstash плагины для поддержки sflow
/usr/share/logstash/bin/logstash-plugin install logstash-codec-sflow
/usr/share/logstash/bin/logstash-plugin install logstash-filter-translate
Проверяем, что плагины есть в списке
/usr/share/logstash/bin/logstash-plugin list
Настраиваем Logstash на обработку sflow
nano /etc/logstash/conf.d/config.conf
input {
udp {
port => 6343
codec => sflow {}
}
}
filter {
translate {
field => protocol
dictionary => [ "1", "ETHERNET",
"11", "IP"
]
fallback => "UNKNOWN"
destination => protocol
override => true
}
translate {
field => eth_type
dictionary => [ "2048", "IP",
"33024", "802.1Q VLAN"
]
fallback => "UNKNOWN"
destination => eth_type
override => true
}
translate {
field => vlan_type
dictionary => [ "2048", "IP"
]
fallback => "UNKNOWN"
destination => vlan_type
override => true
}
translate {
field => ip_protocol
dictionary => [ "6", "TCP",
"17", "UDP",
"50", "Encapsulating Security Payload"
]
fallback => "UNKNOWN"
destination => ip_protocol
override => true
}
mutate {
convert => { "src_port" => "integer"
"dst_port" => "integer" }
}
}
output {
elasticsearch {
index => "sflow-%{+YYYY.MM.dd}"
hosts => ["localhost"]
}
}
В файле конфигурации Logstash мы настроили следующее:
- открыть порт udp
6343
- обрабатывать входящий трафик на порту udp
6343
кодекомsflow
- преобразовывать номера протоколов в их названия
- конвертировать тип данных портов в целочисленный
- хранить данные в elastichsearch в индексе
sflow-*
Проверяем корректность конфигурационного файла
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/config.conf -t
Применяем изменения
systemctl restart logstash
Проверяем сетевую доступность sflow
ss -naut | grep :6343
Настраиваем Juniper
set protocols sflow polling-interval 30 sample-rate ingress 1000 egress 1000
set protocols sflow collector 10.10.10.10 udp-port 6343
set protocols sflow interfaces ge-0/0/47
Настраиваем Kibana
Проверяем что индекс в Elastichsearch создался
curl -X GET 'http://localhost:9200/sflow-*
Заходим на Kibana
http://10.10.0.52:5601/
настраиваем индекс по умолчанию Management Index name or pattern
sflow-*
.
Пробуем отобразить интересующий трафик
- Добавляем поля Discover -> Add fields
(src_ip, dst_ip, dst_port, ip_protocol)
- Пробуем поиск по источнику Discover ->
src_ip: 10.10.*
Может потребоваться удалить сохраненную информацию в Elastisearch
curl -X DELETE 'http://localhost:9200/sflow-*
Настраиваем удаление старых indeces
Со временем количество данных, хранящихся в Elaasticsearch может занять все свободное место на диске. Если ELK используется только для просмотра новых данных, то можно настроить удаление старых indeces. Проверяем количество indeces и занимаемое ими место
curl -X GET http://localhost:9200/_cat/indices?v
Установим утилиту curator, которая позволит нам удалять indeces
nano /etc/yum.repos.d/curator.repo
[curator-4]
name=CentOS/RHEL 7 repository for Elasticsearch Curator 4.x packages
baseurl=http://packages.elastic.co/curator/4/centos/7
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
yum install elasticsearch-curator.x86_64
Проверим indeces, старше 30 дней
curator_cli show_indices --filter_list '[{"filtertype":"age","source":"name","direction":"older", "timestring": "'%Y.%m.%d'", "unit":"days","unit_count":30},{"filtertype":"pattern","kind":"prefix","value":"sflow-"}]'
Добавляем задание в cron на удаление indeces старше 90 дней
cp /etc/cron.daily/logrotate /etc/cron.daily/curator
nano /etc/cron.daily/curator
#!/bin/sh
/usr/bin/curator_cli delete_indices --filter_list '[{"filtertype":"age","source":"name","direction":"older", "timestring": "'%Y.%m.%d'", "unit":"days","unit_count":90},{"filtertype":"pattern","kind":"prefix","value":"sflow-"}]'
Стоить заметить, что это минимальная настройка для проверки работоспособности. При вводе ELK в эксплуатацию следует подумать о аутентификации, шифровании, оптимизации работы Java.