项目中一般都会有数据统计模块,比如说,按天、月、年统计订单、会员的数量、金额等。对于这类需求,下面我将简单介绍一下我的技术方案,抛砖引玉。
一、如果项目小,数据量不大,就直接写SQL语句去实时统计;
二、数据库做集群,主从复制,在从库上做数据统计;
三、建一个数据汇总表,把统计数据写入这个表,然后统计报表从这个表去查询,这样性能就好很多了;
四、把数据同步到ElasticSearch这类分布式搜索和分析引擎,然后用ElasticSearch去做统计。数据同步,可以采用阿里巴巴的Canal工具,也可以在发生业务的时候推送到ElasticSearch。
关于方案三,数据写入汇总表,然后统计数据从汇总表去查询,这个可能是最常用的方案。那么,怎么把数据写入汇总表呢?这里介绍2种办法:
1、发生业务的时候,把数据写入汇总表。但这样的不好之处是:旧的数据没有统计到;耦合度高,容易bug。至于性能,则可以考虑异步处理。
2、写个Python这类脚本来定时统计。
我喜欢的是用Python来做定时统计。Python很适合用来做这类工作,代码量绝对比Java少多了,这就是动态语言的厉害之处。Java这类语言,适合团队协作,多人开发,而Python这类语言很适合做一些小工具,更适合敏捷开发。代码例子如下:
# coding: utf-8
# !/usr/bin/python3
import argparse
import sys
import time
sys.path.append('..')
import datetime
import pymysql
import os
# 引入logging模块
import logging
# 注意安装命令是:pip3 install python-dateutil
from dateutil.relativedelta import relativedelta
logging.basicConfig(level=logging.INFO, # 控制台打印的日志级别
filename=os.path.join(os.getcwd(), 'data_count' + datetime.date.today().strftime("%Y-%m-%d") + '.txt'),
filemode='a', ##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
# a是追加模式,默认如果不写的话,就是追加模式
format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
# 日志格式
)
# active参数,正式环境prod,测试环境test 用法:data_count.py --active=test
parser = argparse.ArgumentParser(description='命令行参数测试')
parser.add_argument('--active', type=str, default='')
args = parser.parse_args()
active = args.active
"""
这个程序的作用是:数据统计
"""
class dataCount(object):
def __init__(self):
if active == 'prod':
self.conn_default = pymysql.connect(host='count-db.mysql.polardb.rds.aliyuncs.com',
user='user', passwd='passwd',
db='count_db',
port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
else:
self.conn_default = pymysql.connect(host='192.168.0.8',
user='user', passwd='passwd',
db='test_db',
port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
connect_timeout=7200)
self.cursor_default = self.conn_default.cursor()
# 检查日序列是否存在
def exist_day_index(self, start_day: str):
day_index = start_day.replace('-', '')
sql = f"""
select count(*) as count from statistics t where t.day_index = {day_index} and t.del_flag = 0
"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
if not row or row.get('count') == 0:
sql = f"""
insert into statistics(day_index, contract_num, contract_amount)
values({day_index}, 0, 0)
"""
self.cursor_default.execute(sql)
def get_start_day(self):
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d")
# 根据身份证号码获取性别
def get_gender(self, id_card: str):
# 男:0 女:1
num = int(id_card[16:17])
if num % 2 == 0:
return 1
else:
return 0
# 统计合同总数
def count_contracts(self, start_day: str):
start_time = start_day + ' 00:00:00'
end_time = start_day + ' 23:59:59'
day_index = start_day.replace('-', '')
sql = f"""
select count(*) as count from contract where create_date >= '{start_time}' and create_date <= '{end_time}'
and del_flag = 0
"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
num = row.get('count')
if num and num > 0:
sql = f"""
UPDATE statistics t
SET t.contract_num = {num}
WHERE t.day_index = {day_index} and t.del_flag = 0
"""
self.cursor_default.execute(sql)
# 统计合同金额
def count_contract_amount(self, start_day: str):
start_time = start_day + ' 00:00:00'
end_time = start_day + ' 23:59:59'
day_index = start_day.replace('-', '')
sql = f"""
select sum(money) as money from contract where create_date >= '{start_time}' and create_date <= '{end_time}'
and del_flag = 0
"""
self.cursor_default.execute(sql)
row = self.cursor_default.fetchone()
money = row.get('money')
if money and money > 0:
sql = f"""
UPDATE statistics t
SET t.contract_amount = {money}
WHERE t.day_index = {day_index} and t.del_flag = 0
"""
self.cursor_default.execute(sql)
# 更新性别
def update_gender(self):
sql = """
select sys_user.id, sys_user.gender, info_user.id_card from sys_user INNER JOIN info_user ON info_user.user_id = sys_user.id
WHERE info_user.id_card is not null;
"""
self.cursor_default.execute(sql)
rows = self.cursor_default.fetchall()
for row in rows:
id = row.get('id')
id_card = row.get('id_card')
gender_database = row.get('gender')
# 计算性别
gender = self.get_gender(id_card)
if gender_database != gender:
logging.info(f"身份证号码为{id_card}的会员性别有误")
sql = f"""
update sys_user set gender = {gender} where id = {id} and gender != {gender}
"""
self.cursor_default.execute(sql)
# 延迟1秒
time.sleep(1)
# 循环日期去统计
def loop_date_count(self, days: int):
begin = datetime.datetime.now()
for i in range(0, days):
day = begin + datetime.timedelta(days=-i)
start_day = day.strftime("%Y-%m-%d")
# 更新错误的性别
self.update_gender()
self.exist_day_index(start_day)
# 统计合同总数
self.count_contracts(start_day)
# 统计合同金额
self.count_contract_amount(start_day)
if __name__ == '__main__':
data_count = dataCount()
# 任务相关统计
data_count.loop_date_count(3)
# start_day = data_count.get_start_day()
# 数据库提交
data_count.conn_default.commit()
# 游标关闭
data_count.cursor_default.close()
# 连接关闭
data_count.conn_default.close()
logging.info("已经完成了")
sys.exit(1)
怎么样?很简单吧。
Linux操作系统一般都自带Python,新的系统都普遍是Python3了。直接在服务器上做个定时任务,每天执行,就完事了。当然,也可以Java去执行shell命令的方式去运行Python。办法有很多。