Travis's Tech Blog

Noting Travis's daily life


K-临近算法介绍和实践

K-临近算法概述

KNN算法(K-Nearest Neighbors)算法是机器学习领域广泛使用的分类算法之一,所谓KNN,说的就是每个样本的分类都可以用它最接近的K个邻居来代表。

KNN算法的基本思路是:

1、给定一个训练集数据,每个训练集数据都是已经分好类的。

2、设定一个初始的测试数据a,计算a到训练集所有数据的欧几里得距离,并排序。

3、选出训练集中离a距离最近的K个训练集数据。

4、比较k个训练集数据,选出里面出现最多的分类类型,此分类类型即为最终测试数据a的分类。

Python中的KNN相关API

在python中,我们可以很容易使用sklearn中的neighbors module来创建我们的KNN模型。

API文档中,neighbors模块中的方法如下:

接下来我们用一个简单的例子只管阐述一下KNN算法的实现。

K临近算法的hello world

首先我们要创建二维空间上的一些点用于我们测试:

from sklearn.neighbors import NearestNeighbors
import numpy as np
X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])

由以上代码可以看到,我们在二维空间中创建了6个测试点用于我们的实验,接下来我们便是要使用KNN算法,训练出一个合适我们这六个点的一个分类模型。

#NearestNeighbors用到的参数解释 
#n_neighbors=5,默认值为5,表示查询k个最近邻的数目 
#algorithm='auto',指定用于计算最近邻的算法,auto表示试图采用最适合的算法计算最近邻 
#fit(X)表示用X来训练算法 
nbrs = NearestNeighbors(n_neighbors=2, algorithm="ball_tree").fit(X) 

这样,我们最简单的一个KNN的分类器就完成了,接下来看看效果。

#返回的indices是距离该点较近的k个点的下标,distance则是距离
distances, indices = nbrs.kneighbors(X)

结果如下图所示:

第一个矩阵是distances矩阵,第二个则表示k个距离该点较近的点的坐标。

还可以输入以下代码可视化结果:

#输出的是求解n个最近邻点后的矩阵图,1表示是最近点,0表示不是最近点  
print nbrs.kneighbors_graph(X).toarray() 

使用KNN算法用于监督学习的时候也是非常简单的:

如下图所示:

使用K近邻算法检测异常操作

在黑客入侵Web服务器之后,通常会使用一些系统漏洞进行提权,获得服务器的root权限,在接下来的工作中,我们通过搜集Linux服务器的bash操作日志,通过训练识别出特定用户的使用习惯,然后进一步识别出异常操作。

数据情况大概如下:

训练数据中包含50个用户的操作日志,每个日志包含了15000条操作命令,其中前5000条为正常操作,后面10000条随机包含有异常操作。为了方便训练,我们将100个命令视为一个操作序列,在label.txt中是对应每个用户的后100个操作序列中是否异常的标签。正常为0,异常为1。

源码如下:

# -*- coding:utf-8 -*-
import sys
import urllib
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import nltk
import matplotlib.pyplot as plt
from nltk.probability import FreqDist
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn import metrics
#测试样本数
N=90
def load_user_cmd(filename):
	cmd_list=[]
	dist_max=[]
	dist_min=[]
	dist=[]
	with open(filename) as f:
	    i=0
	    x=[]
	    for line in f:
	        line=line.strip('\n')
	        x.append(line)
	        dist.append(line)
	        i+=1
	        #每100条命令组成一个操作序列
	        if i == 100:
	            cmd_list.append(x)
	            x=[]
	            i=0
	#统计最频繁使用的前50个命令和最少使用的50个命令
	fdist =list(FreqDist(dist).keys())
	dist_max=set(fdist[0:50])
	dist_min = set(fdist[-50:])
	return cmd_list,dist_max,dist_min

#特征化用户使用习惯
def get_user_cmd_feature(user_cmd_list,dist_max,dist_min):
	user_cmd_feature=[]
	for cmd_block in user_cmd_list:
    	#以100个命令为统计单元,作为一个操作序列,去重后的操作命令个数作为特征
    	#将list转为set去重
    	f1=len(set(cmd_block))
    	#FreqDist转为统计字典转化为命令:出现次数的形式
    	fdist = list(FreqDist(cmd_block).keys())
    	#最频繁使用的10个命令
    	f2=fdist[0:10]
    	#最少使用的10个命令
    	f3 = fdist[-10:]
    	f2 = len(set(f2) & set(dist_max))
    	f3 = len(set(f3) & set(dist_min))
    	#返回该统计单元中和总的统计的最频繁使用的前50个命令和最不常使用的50个命令的重合程度
    	#f1:统计单元中出现的命令类型数量
    	#f2:统计单元中最常使用的10个命令和总的最常使用的命令的重合程度
    	#f3:统计单元中最不常使用的10个命令和总的最不常使用的命令的重合程度
    	x=[f1,f2,f3]
    	user_cmd_feature.append(x)
	return user_cmd_feature

def get_label(filename,index=0):
	x=[]
	with open(filename) as f:
    	for line in f:
        	line=line.strip('\n')
        	x.append( int(line.split()[index]))
	return x

if __name__ == '__main__':    
	user_cmd_list,user_cmd_dist_max,user_cmd_dist_min=load_user_cmd("../data/MasqueradeDat/User3")
	user_cmd_feature=get_user_cmd_feature(user_cmd_list,user_cmd_dist_max,user_cmd_dist_min)
	#index=2 即为User3对应的label
	labels=get_label("../data/MasqueradeDat/label.txt",2)
	#前5000个记录为正常操作 即前50个序列为正常操作
	y=[0]*50+labels
	x_train=user_cmd_feature[0:N]
	y_train=y[0:N]
	x_test=user_cmd_feature[N:150]
	y_test=y[N:150]
	neigh = KNeighborsClassifier(n_neighbors=3)
	neigh.fit(x_train, y_train)
	y_predict=neigh.predict(x_test)
	score=np.mean(y_test==y_predict)*100
	print(y_test)
	print(y_predict)
	print(score)
	print(classification_report(y_test, y_predict))
	print(metrics.confusion_matrix(y_test, y_predict))

运行一下,可以看到准确率约为83.3%,结果不是很理想。

如果我们想提高我们的验证效果,我们需要更加全盘考虑异常操作的出现。因此接下来,我们使用全量比较,来重新对用户的是否异常进行训练。

在数据搜集和清洗的时候,我们使用dict数据结构,将全部命令去重之后形成一个大型的向量空间,每个命令代表一个特征,首先通过遍历全部命令生成对应的词集。因此我们重写load_user_cmd方法,如下:

def load_user_cmd_new(filename):
	cmd_list=[]
	dist=[]
	with open(filename) as f:
	    i=0
	    x=[]
	    for line in f:
	        line=line.strip('\n')
	        x.append(line)
	        dist.append(line)
	        i+=1
	        if i == 100:
	            cmd_list.append(x)
	            x=[]
	            i=0
	fdist = list(FreqDist(dist).keys())
	return cmd_list,fdist

得到词集之后,将命令向量化,方便模型的训练。因此再重写get_user_feature方法,重新获取用户特征。

def get_user_cmd_feature_new(user_cmd_list,dist):
	user_cmd_feature=[]
	for cmd_list in user_cmd_list:
    	v=[0]*len(dist)
    	for i in range(0,len(dist)):
    	    if dist[i] in cmd_list:
    	        v[i]+=1
    	user_cmd_feature.append(v)
	return user_cmd_feature

训练新模型和之前的方法一样。

效果验证的时候,使用交叉验证,通过十次随机取样和验证,提高验证的可信度:

score=cross_val_score(neigh,user_cmd_feature,y,n_jobs = -1,cv=10)

最后可以得到一个准确率约在93%左右的较好的模型。

除了以上的几个例子,使用KNN来进行webshell检测和rootkit检测的例子可以参考https://github.com/traviszeng/MLWithWebSecurity/tree/master/code/KNNsample