使用数据集:

# outlook temperature humidity windy play
train = [
    ('sunny', 85.0, 85.0, False, 'no'),
    ('rainy', 75.0, 80.0, False, 'yes'),
    ('sunny', 75.0, 70.0, True, 'yes'),
    ('overcast', 72.0, 90.0, True, 'yes'),
    ('overcast', 81.0, 75.0, False, 'yes'),
    ('rainy', 71.0, 91.0, True, 'no'),
    ('sunny', 80.0, 90.0, True, 'no'),
    ('overcast', 83.0, 86.0, False, 'yes'),
    ('rainy', 70.0, 96.0, False, 'yes'),
    ('rainy', 68.0, 80.0, False, 'yes'),
    ('rainy', 65.0, 70.0, True, 'no'),
    ('overcast', 64.0, 65.0, True, 'yes'),
    ('sunny', 72.0, 95.0, False, 'no'),
    ('sunny', 69.0, 70.0, False, 'yes'),
]

当属性是离散的: 类的先验概率 = 该类样本数量 / 样本总数

类条件概率P ( Xi=xi | Y = yj )可以根据类yj中属性等于xi的训练实例的比例来估计

对于上述数据集:

先验概率 P(yes) = 9/14  P(no) = 5/14

P(sunny|yes) = 2/9    P(sunny|no) = 3/5

P(rainy|yes) = 3/9    P(rainy|no) = 2/5

P(overcast|yes) = 4/9    P(overcast|no) = 0

P(True|yes) = 3/9    P(True|no) = 3/5

P(False|yes) = 6/9    P(False|no) = 2/5

当属性是连续的,需要预先估计该样本的分布模板(如均匀分布,高斯分布等)

然后依照分布的概率密度函数计算得到概率

假设本文中的第一个连续变量均符合0-100间的均匀分布,则 P(X) = 0.01

假设本文中得第二个连续变量符合高斯分布 P(Y)

如果类为 yes   样本均值为:79.1    样本方差为:92.77

如果类为no     样本均值为:86.2    样本方差为:75.76

则假设测试数据为

('overcast', 89.0, 69.0, True)

P(yes)*P(overcast|yes)*P(True|yes)*P(X=89.0)*P(Y=69.0|yes) = 9/14 * 4/9 * 3/9 * 1/100 * 0.0042 = 0.000004

P(no)*P(overcast|no)*P(True|no)*P(X=89.0)*P(Y=69.0|no) = 5/14 * 0 *….. = 0

因此 测试数据的类别为yes

import numpy
from math import pow, log
from math import e as E

# outlook temperature humidity windy play
train = [
    ('sunny', 85.0, 85.0, False, 'no'),
    ('rainy', 75.0, 80.0, False, 'yes'),
    ('sunny', 75.0, 70.0, True, 'yes'),
    ('overcast', 72.0, 90.0, True, 'yes'),
    ('overcast', 81.0, 75.0, False, 'yes'),
    ('rainy', 71.0, 91.0, True, 'no'),
    ('sunny', 80.0, 90.0, True, 'no'),
    ('overcast', 83.0, 86.0, False, 'yes'),
    ('rainy', 70.0, 96.0, False, 'yes'),
    ('rainy', 68.0, 80.0, False, 'yes'),
    ('rainy', 65.0, 70.0, True, 'no'),
    ('overcast', 64.0, 65.0, True, 'yes'),
    ('sunny', 72.0, 95.0, False, 'no'),
    ('sunny', 69.0, 70.0, False, 'yes'),
]


class Bayesian_cluster:
    def __init__(self, dataset):
        self.dataset = dataset
        self.classes = len(dataset[0]) - 1
        self.cluster = set([i[len(i) - 1] for i in self.dataset])
        self.total = len(self.dataset)
        self.model = {}

    distribution_function = {
        'Gaussion': ['train_gaussion', 'calc_gaussion']
    }

    def get_total(self):
        self.model.setdefault(-1, {})
        for classname in self.cluster:
            self.model[-1].setdefault(classname,
                                      len([i for i in self.dataset if i[len(i) - 1] is classname]) / float(self.total))

    def get_discrete(self, index):
        self.model.setdefault(index, {})
        t_set = set([i[index] for i in self.dataset])
        for classname in self.cluster:
            self.model[index].setdefault(classname, {})
            sum = len([i for i in self.dataset if i[len(i) - 1] is classname])
            for attr in t_set:
                self.model[index][classname].setdefault(attr, len(
                    [i for i in self.dataset if i[index] is attr and i[len(i) - 1] is classname]) / float(sum))

    def train_gaussion(self, index):
        self.model.setdefault(index, {})
        self.model[index].setdefault('type', 'Gaussion')
        for classname in self.cluster:
            self.model[index].setdefault(classname, {})
            sum = len([i for i in self.dataset if i[len(i) - 1] is classname])
            narray = numpy.array([float(i[index]) for i in self.dataset if i[len(i) - 1] is classname])
            mean = narray.sum() / sum
            self.model[index][classname].setdefault('avg', mean)
            narray_d = narray * narray
            self.model[index][classname].setdefault('variance', narray_d.sum() / sum - mean ** 2)

    def calc_gaussion(self, testcase, index, classname):
        f = 1.0 / ((2 * 3.14159265358) * numpy.sqrt(self.model[index][classname]['variance']))
        fm = 2 * self.model[index][classname]['variance']
        b = E ** (- pow(float(testcase[index]) - self.model[index][classname]['avg'], 2) / fm)
        return f * b

    def get_series(self, index):
        type = ''
        while type not in self.distribution_function.keys():
            type = input(
                'For ' + str(index) + ' Which distribution ?' + str([i for i in self.distribution_function.keys()]))
        eval('self.' + self.distribution_function[type][0])(index)

    def train(self):
        self.get_total()
        for i in range(self.classes):
            if isinstance(self.dataset[0][i], str) or isinstance(self.dataset[0][i], bool):
                self.get_discrete(i)
            else:
                self.get_series(i)

    def calc(self, testcase):
        result = {}
        for classname in self.cluster:
            result.setdefault(classname, 1)
            t = self.model[-1][classname]
            for i in range(len(testcase)):
                if isinstance(testcase[i], str) or isinstance(testcase[i], bool):
                    t *= self.model[i][classname][testcase[i]]
                else:
                    type = self.model[i]['type']
                    t *= eval('self.' + self.distribution_function[type][1])(testcase, i, classname)
            result[classname] = t
        max_v = 0
        max_k = ''
        for i in result:
            if result[i] > max_v:
                max_v = result[i]
                max_k = i
        return result, max_k


# b = Bayesian_cluster(train)
# b.train()
# table, result = b.calc(['rainy', 76.0, 95.0, True])
# print(result)
# print(table)
# print(b.model)

其中 distribution_function 用来记录分布方式和对应的估计,计算函数

这里只实现了高斯分布

可见输出:

For 1 Which distribution ?['Gaussion']Gaussion
For 2 Which distribution ?['Gaussion']Gaussion
no
{'yes': 7.255865454319437e-06, 'no': 2.0781765095687095e-05}

即 no 的概率 高于 yes 所以分类为 yes