下载试用

博客

自定义命令

Posted on 2017年03月17日

对海量、复杂的数据处理,Splunk提供了简单易行的解决方案。在日常数据处理过程中,可能会遇到内置命令无法完成的情况,Splunk提供了命令扩展的方法。

 

 

Splunk自定义命令使用Python实现,从stdin读取输入脚本进行处理,并把输出写入到stdout中,SDK提供了对两种数据类型的处理:流式、非流式。

流式

对每个事件进行处理,并把结果写入到stdout中,比如在每个事件中添加一个字段。

非流式

在操作之前等待全部数据,比如内置stats命令,等待全部的数据后再统计计算。

Splunk Python SDK包含多个模板,其中splunklib.searchcommands模块提供几种基础类,通过继承它我们就可以轻松构建自定义命令。

 

searchcommands模块结构

图片1

SearchCommand类

图片2

SearchCommand类提供了基础的方法,比如:process()处理数据,prepare()提供了钩子,_process_protocol_v1()和_process_protocol_v2()是SCP (Search Command Protocol)的两个版本的实现。

图片3

EventingCommand用于对管道符前搜索结果的过滤、分组、排序和增加事件。比如:内置的sort_、dedup_和cluster_。

图片4

GeneratingCommand基于命令的参数生成事件,不接收输入,并且必须是管道上的第一个命令。比如第一个管道符的search、inputcsv命令等。

使用时只需要重载generate生成器函数,并使用yield关键字返回数据即可。另外,SDK提供的类装饰器@Configuration()中,local参数可以配置命令运行方式,默认在search head上运行,参数streaming可以限制事件必须是流式数据,generates_timeorder参数为True时,事件按照时间排序。

图片5

ReportingCommand处理搜索结果并生成报告。使用时需要实现map方法,迭代一组事件并生成dict或list的实例。比如:statstoptimechart命令等。

图片6

StreamingCommand处理流式数据,实现stream()方法作为生成器函数,迭代一组事件并生成dict或list的实例。默认情况下,Splunk在search head上运行流式命令或在一个或多个indexer上远程运行流式命令。

自定义命令需要放到某个app下,几个必要的文件:

$SPLUNK_HOME/etc/app/<app_name>/bin/command.py 自定义命令;

$SPLUNK_HOME/etc/app/<app_name>/default/commands.conf 自定义命令配置文件。

如果需要用到SDK,需要把SDK路径加到$PYTHON_PATH中,或者拷贝到脚本的同级目录。

 

下面我们看一个例子

countmatches自定义命令用于统计字段中单词的数量,用法
| inputlookup tweets | countmatches fieldname=word_count pattern=\\w+
(tweets.csv.gz下载地址: https://raw.githubusercontent.com/splunk/splunk-sdk-python/master/examples/searchcommands_app/package/lookups/tweets.csv.gz)

代码结构:

├── <app_name>

├── bin

│   ├── splunklib …………… SDK

│   │   └── searchcommands ……. splunklib.searchcommands 模块

│   └── countmatches.py ………. 自定义命令

└── default

└── commands.conf ………… 自定义命令配置文件

 

commands.conf 文件

[countmatches]

filename = countmatches.py

chunked = true

 

countmatches.py 文件

#!/usr/bin/env python

# coding=utf-8

#

# Copyright © 2011-2015 Splunk, Inc.

 

from __future__ import absolute_import, division, print_function, unicode_literals

 

import sys

from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators

 

@Configuration()

class CountMatchesCommand(StreamingCommand):  # 继承 StreamingCommand

fieldname = Option(

doc=”’

**Syntax:** **fieldname=***<fieldname>*

**Description:** Name of the field that will hold the match count”’,

require=True, validate=validators.Fieldname())  # 参数 fieldname

 

pattern = Option(

doc=”’

**Syntax:** **pattern=***<regular-expression>*

**Description:** Regular expression pattern to match”’,

require=True, validate=validators.RegularExpression())  # 参数 pattern,接收正则表达式

 

def stream(self, records):  # 重载 stream() 方法

self.logger.debug(‘CountMatchesCommand: %s’, self)  # logs command line

pattern = self.pattern

for record in records:

count = 0L

for fieldname in self.fieldnames:

matches =

pattern.findall(unicode(record[fieldname].decode(“utf-8”)))

count += len(matches)

record[self.fieldname] = count  # 统计数量

yield record

 

dispatch(CountMatchesCommand, sys.argv, sys.stdin, sys.stdout, __name__)

 

图片7

 

可以看到countmatches命令对字段text使用正则表达式“\\w+”进行字数统计,并把结果加入字段word_count中。

 

Splunk开发参考:

开发指南,http://dev.splunk.com/python

python-sdk源码,https://github.com/splunk/splunk-sdk-python

样例,https://github.com/splunk/splunk-sdk-python/tree/master/examples

 

By Derek

问问专家
Splunk销售与支持

  • 联系电话:400-067-1005
  • 电子邮件:contact@10data.com


沪ICP备11017547 沪公网安备 31011502002368号 ©版权所有 2005-2017 上海天旦网络科技发展有限公司(Netis)

GO TOP

返回