电脑上按Ctrl + D,下次访问更方便
服务分类

Java项目,做数据统计的技术方案

项目中一般都会有数据统计模块,比如说,按天、月、年统计订单、会员的数量、金额等。对于这类需求,下面我将简单介绍一下我的技术方案,抛砖引玉。

一、如果项目小,数据量不大,就直接写SQL语句去实时统计;
二、数据库做集群,主从复制,在从库上做数据统计;
三、建一个数据汇总表,把统计数据写入这个表,然后统计报表从这个表去查询,这样性能就好很多了;
四、把数据同步到ElasticSearch这类分布式搜索和分析引擎,然后用ElasticSearch去做统计。数据同步,可以采用阿里巴巴的Canal工具,也可以在发生业务的时候推送到ElasticSearch。

关于方案三,数据写入汇总表,然后统计数据从汇总表去查询,这个可能是最常用的方案。那么,怎么把数据写入汇总表呢?这里介绍2种办法:
1、发生业务的时候,把数据写入汇总表。但这样的不好之处是:旧的数据没有统计到;耦合度高,容易bug。至于性能,则可以考虑异步处理。
2、写个Python这类脚本来定时统计。

我喜欢的是用Python来做定时统计。Python很适合用来做这类工作,代码量绝对比Java少多了,这就是动态语言的厉害之处。Java这类语言,适合团队协作,多人开发,而Python这类语言很适合做一些小工具,更适合敏捷开发。代码例子如下:

# coding: utf-8
# !/usr/bin/python3
import argparse
import sys
import time

sys.path.append('..')
import datetime
import pymysql
import os
# 引入logging模块
import logging
# 注意安装命令是:pip3 install python-dateutil
from dateutil.relativedelta import relativedelta


logging.basicConfig(level=logging.INFO,  # 控制台打印的日志级别
                    filename=os.path.join(os.getcwd(), 'data_count' + datetime.date.today().strftime("%Y-%m-%d") + '.txt'),
                    filemode='a',  ##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
                    # a是追加模式,默认如果不写的话,就是追加模式
                    format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
                    # 日志格式
                    )
# active参数,正式环境prod,测试环境test 用法:data_count.py --active=test
parser = argparse.ArgumentParser(description='命令行参数测试')
parser.add_argument('--active', type=str, default='')
args = parser.parse_args()
active = args.active

"""
这个程序的作用是:数据统计
"""
class dataCount(object):
    def __init__(self):
        if active == 'prod':
            self.conn_default = pymysql.connect(host='count-db.mysql.polardb.rds.aliyuncs.com',
                                                user='user', passwd='passwd',
                                                db='count_db',
                                                port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
                                                connect_timeout=7200)
        else:
            self.conn_default = pymysql.connect(host='192.168.0.8',
                                                user='user', passwd='passwd',
                                                db='test_db',
                                                port=3306, charset='utf8', cursorclass=pymysql.cursors.DictCursor,
                                                connect_timeout=7200)

        self.cursor_default = self.conn_default.cursor()

    # 检查日序列是否存在
    def exist_day_index(self, start_day: str):
        day_index = start_day.replace('-', '')
        sql = f"""
        select count(*) as count from statistics t where t.day_index = {day_index} and t.del_flag = 0
        """
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        if not row or row.get('count') == 0:
            sql = f"""
            insert into statistics(day_index, contract_num, contract_amount)
        values({day_index}, 0, 0)
            """
            self.cursor_default.execute(sql)


    def get_start_day(self):
        now = datetime.datetime.now()
        return now.strftime("%Y-%m-%d")

    # 根据身份证号码获取性别
    def get_gender(self, id_card: str):
        # 男:0 女:1
        num = int(id_card[16:17])
        if num % 2 == 0:
            return 1
        else:
            return 0

    # 统计合同总数
    def count_contracts(self, start_day: str):
        start_time = start_day + ' 00:00:00'
        end_time = start_day + ' 23:59:59'
        day_index = start_day.replace('-', '')
        sql = f"""
        select count(*) as count from contract where create_date >= '{start_time}' and create_date <= '{end_time}'
        and del_flag = 0
        """
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        num = row.get('count')
        if num and num > 0:
            sql = f"""
                            UPDATE statistics t
                        SET t.contract_num = {num}
                        WHERE t.day_index = {day_index} and t.del_flag = 0
                                """
            self.cursor_default.execute(sql)

    # 统计合同金额
    def count_contract_amount(self, start_day: str):
        start_time = start_day + ' 00:00:00'
        end_time = start_day + ' 23:59:59'
        day_index = start_day.replace('-', '')
        sql = f"""
        select sum(money) as money from contract where create_date >= '{start_time}' and create_date <= '{end_time}'
        and del_flag = 0
        """
        self.cursor_default.execute(sql)
        row = self.cursor_default.fetchone()
        money = row.get('money')
        if money and money > 0:
            sql = f"""
                            UPDATE statistics t
                        SET t.contract_amount = {money}
                        WHERE t.day_index = {day_index} and t.del_flag = 0
                                """
            self.cursor_default.execute(sql)

    # 更新性别
    def update_gender(self):
        sql = """
        select sys_user.id, sys_user.gender, info_user.id_card from sys_user INNER JOIN info_user ON info_user.user_id = sys_user.id
            WHERE info_user.id_card is not null;
        """
        self.cursor_default.execute(sql)
        rows = self.cursor_default.fetchall()
        for row in rows:
            id = row.get('id')
            id_card = row.get('id_card')
            gender_database = row.get('gender')
            # 计算性别
            gender = self.get_gender(id_card)
            if gender_database != gender:
                logging.info(f"身份证号码为{id_card}的会员性别有误")
                sql = f"""
                update sys_user set gender  = {gender} where id = {id} and gender != {gender}
                """
                self.cursor_default.execute(sql)
                # 延迟1秒
                time.sleep(1)


    # 循环日期去统计
    def loop_date_count(self, days: int):
        begin = datetime.datetime.now()
        for i in range(0, days):
            day = begin + datetime.timedelta(days=-i)
            start_day = day.strftime("%Y-%m-%d")
            # 更新错误的性别
            self.update_gender()
            self.exist_day_index(start_day)
            # 统计合同总数
            self.count_contracts(start_day)
            # 统计合同金额
            self.count_contract_amount(start_day)


if __name__ == '__main__':
    data_count = dataCount()
    # 任务相关统计
    data_count.loop_date_count(3)
    # start_day = data_count.get_start_day()
    # 数据库提交
    data_count.conn_default.commit()
    # 游标关闭
    data_count.cursor_default.close()
    # 连接关闭
    data_count.conn_default.close()
    logging.info("已经完成了")
    sys.exit(1)

怎么样?很简单吧。

Linux操作系统一般都自带Python,新的系统都普遍是Python3了。直接在服务器上做个定时任务,每天执行,就完事了。当然,也可以Java去执行shell命令的方式去运行Python。办法有很多。

发表回复

登录后才能评论
联系我们

联系我们

微信客服:

fuwu360微信客服

工作时间:周一至周五,9:30-18:30,节假日休息

返回顶部