Commit dde629b1 by 郑博

Initial commit

parents
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Remote Python 3.6.3 (sftp://root@192.168.1.99:22/root/hadoop-env/bin/python3.6)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" autoUpload="Always" serverName="root@192.168.1.99:22 password" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="root@192.168.1.99:22 password">
<serverdata>
<mappings>
<mapping deploy="/home/battery_health_new" local="$PROJECT_DIR$" />
</mappings>
</serverdata>
</paths>
</serverData>
<option name="myAutoUpload" value="ALWAYS" />
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Remote Python 3.6.3 (sftp://root@192.168.1.99:22/root/hadoop-env/bin/python3.6)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.6.3 (sftp://root@192.168.1.99:22/root/hadoop-env/bin/python3.6)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/battery_health_new.iml" filepath="$PROJECT_DIR$/.idea/battery_health_new.iml" />
</modules>
</component>
</project>
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import battery_health
import findspark
findspark.init()
print("当前Python路径:", sys.executable)
sys.path.append('/home/hadoop/spark3.2.4/python')
# sys.path.append('/usr/spark/spark-2.4.6-bin-hadoop2.7/python')
# sys.path.append('C:/tools/spark-2.4.8-bin/python')
# sys.path.append('/usr/lib/spark-current/python')
import os
os.environ['JAVA_HOME'] = '/home/soft/jdk1.8.0_271'
# os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/root/hadoop-env/bin/python'
# 远程调试时指定配置文件目录
os.environ["HADOOP_CONF_DIR"] = "/home/hadoop/hive3.1.3/conf"
os.environ["SPARK_CONF_DIR"] = "/home/hadoop/spark3.2.4/conf"
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark_conf = SparkConf()
spark_conf.setAppName("huafon-battery-health")
spark_conf.setMaster("local[*]")
spark_conf.set("spark.driver.memory", "60g")
spark_conf.set("spark.executor.memory","10g") # 每个executor的内存
#spark_conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark_conf.set("spark.sql.crossJoin.enabled", "true")
spark_conf.set("spark.execution.arrow.enabled", "true") # 启用Arrow数据,加速计算
spark_conf.set("spark.shuffle.service.enabled", "true")
spark_conf.set("spark.storage.memoryFraction", "0.6") # 设置储存因子,即储存用的内存占已分配内存的0.6
# 添加Hive相关优化配置
spark_conf.set("hive.exec.dynamic.partition", "true")
spark_conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark_conf.set("hive.exec.max.dynamic.partitions", "1000")
#spark_conf.set("spark.driver.maxResultSize", "4g") # 调试用,设置结果大小为2048MB
# spark_conf.setMaster("local[*]")
# spark_conf.set("spark.logConf", "false")
# spark_conf.set("spark.dynamicAllocation.enabled", "true") # 设置动态分配内存开关
# spark_conf.set("spark.sql.broadcastTimeout", "300") # 设置最大广播超时时间
# spark_conf.set("spark.default.parallelism", "500") # 设置最大并行数目
# spark_conf.set("spark.driver.maxResultSize", "4g")
# spark_conf.set("spark.sql.queryExecutionListeners", "")
# spark_conf.set('spark.sql.execution.arrow.enabled', 'true')
# spark_conf.set("spark.scheduler.mode", "FAIR") # 设置公平调度模式
spark = SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
本文件用来记录不同电站各个设备数据所对应的pid_name以及dev_id。
需要在输入的时候注意:
details中的参数类型需要统一,例如’vol_lower‘,所有电站都需要是double类型,不能有的是int型有的是double型。
"""
# 当前需要使用的电站
# station_list = ['xmei', 'hnc']
station_list = ['hnc']
#station_list = ['shtb']
# 需要被写入influxdb中的点号名称
pid_name_list = ['RealSOH', 'TheorySOH', 'UsedRecycleTimes', 'RemainingRecycleTimes', 'RemainingRecycleDays',
'ForecastProfitMonth', 'BatteryMileageAmount', 'BatteryEfficiencyDay', 'AvgFullCharge',
'AvgFullDischarge', 'BatteryMileageDay', 'ChargeEnergyRetention', 'DischargeEnergyRetention',
'ChargeEndSOC', 'DischargeEndSOC', 'ChargeEndMaxVoltDiff', 'DischargeEndMaxVoltDiff',
'ChargeEndVoltSTD', 'DischargeEndVoltSTD', 'ChargeEndVoltageDiff', 'DischargeEndVoltageDiff',
'ChargeEndVoltageDeviation', 'DischargeEndVoltageDeviation', 'HealthScore', 'MaxCellTempRange']
# 未完成的参数
consistency_weights = [0.1, 0.3, 0.6]
consistency_score_weights = {'temp_weight': 0.5, 'vol_weight': 0.5}
health_score_weights = {'vol_cons': 0.3, 'temp_cons': 0.15, 'cap_cons': 0.5, 'res_cons': 0.05}
cell_health_range = 3
# cell_health_range = 1
general_map = {'hnc': {'id': {'enu': {'dev_id': ['628234'], 'pid_name': ['SystemLoad', 'ActivePower']},
'batu': {'dev_id': ['628607'], 'pid_name': []},
'batc': {'dev_id': ['628241', '628422', '628568', '628608',
'628656', '629330', '629967', '629968']},
'PCS': {'dev_id': ['628549'], 'pid_name': ['DcActivePower']},
'cell': {}},
'details': {'station_id': '628421',
# 'full_charge_index': 85,
'full_charge_index': 40,
'theory_decay_ration': -25,
'power_freq_change': '2019-01-31 23:59:00', 'sample_freq_former': 15 * 60,
'sample_freq_latter': 1 * 10, 'cell_max_capacity': 1215.6, 'profit_per_kwh': 0.36,
'standard_capacity': 2334, 'motion_time': '2021-07-10', 'total_discharge': 3000000,
'threshold': 5, 'freq': '10s', 'charge_energy_pid': '11152001400284',
'discharge_energy_pid': '11152001400286', 'forecast_time_gap': 180, 'percent': 5,
'cell_total': 1920, 'soc_diff_filter': 60, 'stop_soh': 50,
'ChargeEndSOC': 80, 'DischargeEndSOC': 10,
'vol_lower': 1.0, 'vol_upper': 5.0, 'temp_lower': 10.0, 'temp_upper': 60.0,
'cell_soc_flag': 1, 'theory_info': {'theory_cycles': 4000, 'theory_stop_soh': 80},
'charge_stop_vol': 3.6, 'discharge_stop_vol': 2.8,
'energy_zero_date': '2024-01-01',}
},
'xmei': {'id': {'enu': {'dev_id': ['600906'], 'pid_name': ['SystemLoad', 'ActivePower']},
'batu': {'dev_id': ['601052'], 'pid_name': []},
'batc': {'dev_id': ['601051', '908342']},
'cell': {}},
'details': {'station_id': '580828', 'display_time': '2022-06-17', 'full_charge_index': 70,
'discharge_energy_pid': '1114999998508', 'threshold': 5,
'charge_energy_pid': '1114999998507', 'power_freq_change': '2019-01-31 23:59:00',
'sample_freq_former': 1, 'sample_freq_latter': 1,
'profit_per_kwh': 0.36, 'standard_capacity': 422, 'motion_time': '2022-05-10',
'total_discharge': 3000000, 'init_display': 115559.0,
'forecast_time_gap': 360, 'theory_decay_ration': -25, 'unit_time': '1S',
'percent': 5, 'cell_total': 480, 'cell_max_capacity': 883.2, 'stop_soh': 50,
'vol_diff_threshold': 0, 'soc_diff_filter': 50, 'freq': '10s',
'vol_lower': 1.0, 'vol_upper': 5.0, 'temp_lower': 10.0, 'temp_upper': 60.0,
'ChargeEndSOC': 80, 'DischargeEndSOC': 20,
'cell_soc_flag': 0, 'theory_info': {'theory_cycles': 4000, 'theory_stop_soh': 80},
'charge_stop_vol': 3.6, 'discharge_stop_vol': 2.8}
},
'shtb': {'id': {'enu': {'dev_id': ['21202'], 'pid_name': ['SystemLoad', 'ActivePower']},
'batu': {'dev_id': ['21145', '23180', '23112', '23053'], 'pid_name': []},
'batc': {'dev_id': ['23054', '23026', '23027', '23249',
'23250', '23163', '23251', '23164']},
'cell': {}},
'details': {'station_id': '20259', 'power_freq_change': '2019-01-31 23:59:00', 'threshold': 5,
'sample_freq_former': 15 * 60, 'sample_freq_latter': 1 * 10, 'full_charge_index': 94,
'forecast_time_gap': 360, 'theory_decay_ration': -25, 'unit_time': '10S',
'percent': 5, 'cell_total': 1856, 'stop_period': ['2019-12-04', '2020-09-30'],
'vol_diff_threshold': 0, 'profit_per_kwh': 0.36, 'standard_capacity': 1100,
'motion_time': '2019-01-01', 'total_discharge': 3000000, 'stop_soh': 40,
'cell_max_capacity': 750.0, 'soc_diff_filter': 60, 'freq': '10s',
'discharge_energy_pid': '100010043011128', 'cell_soc_flag': 0,
'theory_info': {'theory_cycles': 2000, 'theory_stop_soh': 60},
'charge_energy_pid': '100010043011126', 'energy_zero_date': '2019-04-02',
'ChargeEndSOC': 90, 'DischargeEndSOC': 10,
'vol_lower': 1.0, 'vol_upper': 5.0, 'temp_lower': 15.0, 'temp_upper': 45.0,
'charge_stop_vol': 3.6, 'discharge_stop_vol': 2.8}
}
}
general_map = {i:general_map[i] for i in station_list}
dev_id_list = []
for i in general_map.keys():
for j in ['enu', 'batu', 'batc']:
extra_ids = general_map[i]['id'][j]['dev_id']
dev_id_list += extra_ids
station_id = [general_map[i]['details']['station_id'] for i in general_map.keys()]
station_id = str(station_id).replace('[', '(').replace(']', ')')
# 基础表
data_table = 'prod.ods_data_meas_hs'
pid_table = 'prod.dwb_dev_pid_meta_h'
rel_table = 'prod.dwb_dev_relation_h'
# 中间表
state_table = 'prod.dwb_dev_state_dd'
cell_base_table = "prod.dwb_cell_base_data"
cell_capacity_table = "prod.dwb_cell_capacity_dd"
cell_capacity_conf_table = "prod.dwb_cell_capacity_conf_dd"
dev_table = "prod.dwb_dev_data_dd" # 中间过程表
cell_health_1 = "prod.dm_cell_health_dd"
cell_health_table = "prod.dm_cell_health_second_dd" # 单体电池健康分数据表
batu_health_table = "prod.dm_batu_health_second_dd" # 电池单元健康分数据表
cap_table = "prod.dm_dev_capacity_dd" # 设备电量表
cell_lithium_table = "prod.dwb_cell_lithium_dd" # 单体电池析锂数据表
cell_resistance_table = "prod.dwb_cell_resistance_dd" # 单体电池内阻数据表
cell_circuit_table = "prod.dwb_cell_micro_circuit_dd" # 单体电池微短路数据表
daily_batu_health_table = "prod.dwb_daily_batu_health_dd" # 电池单元每日健康分数据表
end_record_table = "prod.dwb_dev_endRecord_dd" # 设备充放电末端记录表
# state_table = 'cx.dwb_dev_state_dd'
# cell_base_table = "cx.dwb_cell_base_data"
# cell_capacity_table = "cx.dwb_cell_capacity_10_dd"
# cell_capacity_conf_table = "cx.dwb_cell_capacity_conf_dd"
# dev_table = "cx.dwb_dev_data_dd"
# cell_health_1 = "cx.dm_cell_health_dd"
# cell_health_table = "cx.dm_cell_health_second_dd"
# batu_health_table = "cx.dm_batu_health_second_dd"
# cap_table = "cx.dm_dev_capacity_dd"
# cell_lithium_table = "cx.dwb_cell_lithium_dd"
# cell_resistance_table = "cx.dwb_cell_resistance_dd"
# cell_circuit_table = "cx.dwb_cell_micro_circuit_dd"
# daily_batu_health_table = "cx.dwb_daily_batu_health_dd"
influxdb_table = "prod.app_influxdb_data"
batu_tempCons_table = "prod.dm_batu_tempCons_dd" # 电池单元温度一致性数据表
battery_operation_table = "prod.dm_capacity_estimate_dd" # 电池运维容量数据表
# statistic_mid_table = "prod.dwb_statistic_mid_dd" # 新启用的过程表,用于详情页(统计数据)
statistic_mid_table = "prod.dwb_statistic_interm_dd"
# 结果表
health_data = "test.health_data"
health_score = "test.health_score"
soh_table = "test.decay_data"
# 暂时未启用的数据表
cell_ica_table = "prod.dwb_cell_ica_data" # 单体电池ica数据表
voltage_exception_table = "prod.dwb_exception_record_dd" # 单体电池异常电压记录
discharge_energy_table = "prod.dwb_dev_dce_dd"
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health import data_load, configure, udf_collections
import pyspark.sql.functions as F
import pyspark
import time
import pyspark.sql.types as t
import pandas as pd
from dateutil.relativedelta import relativedelta
from pyspark.sql import Window
from battery_health.functions.data_verification import data_check
import time
import requests
from battery_health.functions.statistic_data import statistic_data_prep, statistic_calculation
from battery_health.functions.health_score import health_score_data_prep
from battery_health import spark
def sava_to_hive(date):
# create_risk_data(date=date)
# save_data = create_capacity_soh(date=date)
health_data = data_load.get_data_general('test.health_data_tmp', date=date)
pid_data = data_load.get_dev_pid(targets=['pid', 'dev_id', 'pid_name'],
tier=['BatteryCluster', 'BatteryUnit', 'Storage', 'Cell'])
hive_data = health_data.join(pid_data, on=['dev_id', 'pid_name'], how='left')\
.filter(F.col('station').isin(configure.station_list))
hive_data = hive_data.withColumn('dt', F.lit(date)).select(['pid', 'times', 'val', 'dt', 'station']).dropna()
# 清空要写入的点号当天0点的数据
return hive_data
def empty_influxdb(date, pid_name_list, station=None):
struct_time = time.strptime(date, '%Y-%m-%d')
date_s = str(int(time.mktime(struct_time))*1000000000)
dev_pid = data_load.get_dev_pid(pid_name=pid_name_list, station_id_list='580828',
tier=['BatteryCluster', 'BatteryUnit', 'Storage', 'Cell'])
dp = dev_pid.toPandas()
for i in range(10):
print('round:', i)
dp_ = dp[i::10]
dp_list = dp_.pid.to_list()
#dp_list = ['11159999938769','11159999938775','11159999938774','11159999952235']
s = str(dp_list).replace('[', '%28pid%3D%27').replace(''',''','%27+or+pid%3D%27').replace("'", '')\
.replace(' ', '').replace(']', '%27%29+and+time%3D')+date_s
headers = {
'Accept': 'application/csv',
}
station_nmub_dict = {'xmei':'1114','hnc':'1115'}
if station is None:
response = requests.get(
# 'https://ts-bp1ue44w0814t8cx9.influxdata.tsdb.aliyuncs.com:8086/query?db=sgool&u=huafeng&p=Huafeng@2022&precision=ms&q=delete+from+data_meas_1114_1d+where+{}'.format(s),
'http://192.168.1.99:48088/query?db=sgool&u=yunhe&p=yunhe2022&precision=ms&q=delete+from+data_meas_1114_1d+where+{}'.format(s),
headers=headers,
)
print('round:', i, 'xmei delete finished')
response = requests.get(
# 'https://ts-bp1ue44w0814t8cx9.influxdata.tsdb.aliyuncs.com:8086/query?db=sgool&u=huafeng&p=Huafeng@2022&precision=ms&q=delete+from+data_meas_1115_1d+where+{}'.format(s),
'http://192.168.1.99:48088/query?db=sgool&u=yunhe&p=yunhe2022&precision=ms&q=delete+from+data_meas_1115_1d+where+{}'.format(s),
headers=headers,
)
print('round:', i, 'hnc delete finished')
else:
response = requests.get(
# 'https://ts-bp1ue44w0814t8cx9.influxdata.tsdb.aliyuncs.com:8086/query?db=sgool&u=huafeng&p=Huafeng@2022&precision=ms&q=delete+from+data_meas_{station_numb}_1d+where+{s}'
# .format( s=s, station_numb=station_nmub_dict[station]),
'http://192.168.1.99:48088/query?db=sgool&u=yunhe&p=yunhe2022&precision=ms&q=delete+from+data_meas_{station_numb}_1d+where+{s}'.format( s=s, station_numb=station_nmub_dict[station]),
headers=headers,
)
print('round:', i, station, 'delete finished')
print('x')
def empty_table(table, station, date):
try:
spark.sql("alter table {table} drop partition(station='{station}', dt='{date}')"
.format(table=table, station = station, date = date))
except pyspark.sql.utils.AnalysisException:
print('partition(station={station}, dt={date}) does not exist'.format(station = station, date = date))
def statistic_repair(date, station, empty=True):
# 重算详情页中间表数据
empty_table(table=configure.statistic_mid_table, station=station, date=date)
mid_data = statistic_data_prep.data_prep_station(date).filter(F.col('station')==station)
mid_data.write.format('hive').insertInto(configure.statistic_mid_table, overwrite=True)
# 重算详情页数据
empty_table(table='test.health_data_tmp', station=station, date=date)
data = statistic_calculation.get_statistic(date=date, cycles=1).filter(F.col('station')==station)
data.write.format('hive').insertInto('test.health_data_tmp', overwrite=True)
# 数据汇总,数据存在cx库下的app_influxdb_data表中
empty_table(table='cx.app_influxdb_data', station=station, date=date)
hive_data = sava_to_hive(date).filter(F.col('station') == station)
hive_data.write.format('hive').insertInto('cx.app_influxdb_data', overwrite=True)
# 清空influxdb内数据
if empty:
pid_name_list = ['ChargeEndSOC', 'DischargeEndSOC', 'ChargeEndMaxVoltDiff', 'DischargeEndMaxVoltDiff',
'ChargeEndVoltSTD', 'DischargeEndVoltSTD', 'ChargeEndVoltageDiff', 'DischargeEndVoltageDiff',
'ChargeEndVoltageDeviation', 'DischargeEndVoltageDeviation']
empty_influxdb(date, pid_name_list, station)
if __name__ == '__main__':
print('x')
#!/usr/bin/env python
# -*- coding:utf-8 -*-
\ No newline at end of file
from battery_health import main_process
def t():
main_process.test()
\ No newline at end of file
#!/usr/bin/env python
# encoding: utf-8
from battery_health import configure, data_load
import pyspark.sql.functions as F
class DataVolumeError(Exception):
def __init__(self,merge_data):
self.merge_data = merge_data
pass
def check_dev_data(data):
# 检测数据量,以SOC为键
data = data.select(['soc', 'station']).drop_duplicates()
data = data.toPandas()
for i in data.station.unique():
if len(data[data['station']==i])<len(configure.dev_id_list):
print('station:', i, 'Insufficient Data Volume!')
# raise DataVolumeError('Insufficient Data Volume!')
def check_state_data(state):
# 检测state数据表的数据量,以state为键
state = state.select(['state']).drop_duplicates()
st = state.toPandas()
if len(st)<1:
return 1
else: return 0
def check_cell_number(data):
# 单体数据量检测
data = data.toPandas()
for i in data.station.unique():
d = data[data['station']==i]
actual = len(d.cell_id.unique())
installed = configure.general_map[i]['details']['cell_total']
if actual != installed:
print('station:',i)
print('An abnormal number of cells was detected:')
print(installed,'cells installed, but',actual,'cells have data!')
def check_cell_data(data, col_name):
# 电压、温度数据校验
lower_range = data_load.get_index('{}_lower'.format(col_name))
upper_range = data_load.get_index('{}_upper'.format(col_name))
data_range = lower_range.join(F.broadcast(upper_range), on=['station'], how='left')
data = data.join(F.broadcast(data_range), on=['station'], how='left').filter(
(F.col(col_name) > F.col('{}_lower'.format(col_name))) & (F.col(col_name) < F.col('{}_upper'.format(col_name))))\
.drop('{}_lower'.format(col_name)).drop('{}_upper'.format(col_name))
return data
def check_soc_data(data, col_name):
# 电压、温度数据校验
charge_range = data_load.get_index('ChargeEndSOC')
discharge_range = data_load.get_index('DischargeEndSOC')
data_range = charge_range.join(F.broadcast(discharge_range), on=['station'], how='left')
data = data.join(F.broadcast(data_range), on=['station'], how='left')
charge_data = data.filter((F.col('state')=='charge') & (F.col(col_name)>F.col('ChargeEndSOC')))
discharge_data = data.filter((F.col('state')=='discharge') & (F.col(col_name)<F.col('DischargeEndSOC')))
merge_data = charge_data.unionByName(discharge_data).drop('ChargeEndSOC').drop('DischargeEndSOC')
return merge_data
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
计算SOH相关指标,使用线性回归进行预测,并计算电站的一些衍生指标。
计算目标:[实际SOH、理论SOH、收益、单元日里程 、单元总里程、电池效率、充电保持率、放电保持率、剩余寿命、剩余充电次数、以循环次数、
平均满充电量、平均满放电量。]
'''
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import pandas as pd
from battery_health.functions.derive import derive
from battery_health import spark
def calculate_derive_index(date):
# 将设备在不同充放电阶段的数据(电量和状态)进行汇总
cap_data = derive.cache_capacity_soh(date=date)
# 电站不同工况对放电数据进行过滤
capacity_soh = derive.filter_soc_by_station(cap_data)
capacity_soh_pdf = capacity_soh.toPandas()
# 容量与SoH计算口径以放电模式为基准
dis_capacity_soh = capacity_soh_pdf[capacity_soh_pdf.state == 'discharge']
ch_capacity_soh = capacity_soh_pdf[capacity_soh_pdf.state == 'charge']
dev_charge_cap = derive.calculate_capacity(capacity_soh_pdf=ch_capacity_soh, switch=True)
dev_discharge_cap = derive.calculate_capacity(capacity_soh_pdf=dis_capacity_soh, switch=True)
dev_soh_ = derive.calculate_soh(dev_capacity=dev_discharge_cap, date=date) # 计算设备的SoH
print("----------------dev_soh_ show:",dev_soh_)
ch_soh = derive.calculate_soh(dev_capacity=dev_charge_cap, date=date)
print("----------------ch_soh show:",ch_soh)
# 计算能量单元充放电保持率
enu_ch_ren = derive.calculate_retention(data=ch_capacity_soh)
enu_dis_ren = derive.calculate_retention(data=dis_capacity_soh)
# 计算电池单元里程
batu_mil = derive.calculate_batu_mileage(dev_discharge_data=dev_discharge_cap, date=date)
batu_mil['pid_name'] = 'BatteryMileageAmount'
daily_mil = derive.calculate_daily_mileage(dev_discharge_data=dev_discharge_cap, date=date)
daily_mil['pid_name'] = 'BatteryMileageDay'
daily_mil = daily_mil[daily_mil['dt'] == date]
# 计算已用循环次数、剩余循环次数以及理论SoH
charge_energy, discharge_energy = derive.get_energy_data()
used_cycles, remainder_cycles, theory_soh = derive.calculate_remainder_cycles(discharge_energy, soh_data=dev_soh_,
date=date)
# 能量单元效率计算
enu_efficiency = derive.calculate_efficiency(charge_soh=ch_soh, discharge_soh=dev_soh_)
# 满充满放电量汇算
enu_full_charge = derive.get_full_energy(data=enu_efficiency, state='charge')
enu_full_discharge = derive.get_full_energy(data=enu_efficiency, state='discharge')
enu_full_charge = enu_full_charge[['dev_id', 'charge_full', 'dt', 'station']]
enu_full_discharge = enu_full_discharge[['dev_id', 'discharge_full', 'dt', 'station']]
# 预测SoH
forecast_soh, remainder_days = derive.calculate_forecast_soh(data=dev_soh_, col='discharge_soh', date=date)
# 计算预测收益
forecast_profit = derive.calculate_forecast_profit(charge_energy_df=charge_energy,
discharge_energy_df=discharge_energy,
forecast_soh_df=forecast_soh)
# 对不同指标进行列名替换并增加pid_name
cols = ['dev_id', 'pid_name', 'dt', 'val', 'station']
dev_soh_.rename(columns={'discharge_soh': 'val'}, inplace=True) # 驾驶舱中的实际SoH历史值
forecast_soh.rename(columns={'forecast_soh': 'val'}, inplace=True) # 驾驶舱中的实际SoH预测值
real_soh = pd.concat([dev_soh_, forecast_soh], axis=0)
real_soh['pid_name'] = 'RealSOH'
theory_soh.rename(columns={'theory_soh': 'val'}, inplace=True) # 驾驶舱中的理论SoH(历史值和预测值)
theory_soh['pid_name'] = 'TheorySOH'
used_cycles.rename(columns={'used_cycles': 'val'}, inplace=True) # 驾驶舱中的已用循环次数
used_cycles['pid_name'] = 'UsedRecycleTimes'
remainder_cycles.rename(columns={'remainder_cycles': 'val'}, inplace=True) # 驾驶舱中的剩余循环次数
remainder_cycles['pid_name'] = 'RemainingRecycleTimes'
remainder_days.rename(columns={'remainder_days': 'val'}, inplace=True) # 驾驶舱中的剩余可用天数
remainder_days['pid_name'] = 'RemainingRecycleDays'
forecast_profit.rename(columns={'profit': 'val'}, inplace=True) # 驾驶舱中的预测收益(月度)
forecast_profit['pid_name'] = 'ForecastProfitMonth'
enu_efficiency.rename(columns={'efficiency': 'val'}, inplace=True) # 驾驶舱中的能量单元效率
enu_efficiency['pid_name'] = 'BatteryEfficiencyDay'
enu_full_charge.rename(columns={'charge_full': 'val'}, inplace=True) # 电池运维中的满充电量曲线
enu_full_charge['pid_name'] = 'AvgFullCharge'
enu_full_discharge.rename(columns={'discharge_full': 'val'}, inplace=True) # 电池运维中的满放电量曲线
enu_full_discharge['pid_name'] = 'AvgFullDischarge'
enu_ch_ren.rename(columns={'retention': 'val'}, inplace=True) # 效率分析中的充电保持率
enu_ch_ren['pid_name'] = 'ChargeEnergyRetention'
enu_dis_ren.rename(columns={'retention': 'val'}, inplace=True) # 效率分析中的放电保持率
enu_dis_ren['pid_name'] = 'DischargeEnergyRetention'
# 汇总指标
derive_index = pd.concat([real_soh[cols], theory_soh[cols], used_cycles[cols], remainder_cycles[cols],
remainder_days[cols], forecast_profit[cols], batu_mil[cols], daily_mil[cols],
enu_efficiency[cols], enu_full_charge[cols], enu_full_discharge[cols],
enu_ch_ren[cols], enu_dis_ren[cols]
], axis=0)
derive_index.rename(columns={'dt': 'times'}, inplace=True)
derive_index['times'] = derive_index.times.apply(
lambda x: int(time.mktime(time.strptime(str(x)[:10], "%Y-%m-%d"))) * 1000)
derive_index['val'] = derive_index.val.astype('float')
derive_index['val'] = round(derive_index['val'], 3)
derive_index['dt'] = date
derive_index_spark = spark.createDataFrame(derive_index)
derive_index_spark = derive_index_spark.select(['dev_id', 'pid_name', 'times', 'val', 'dt', 'station'])
return derive_index_spark
if __name__ == '__main__':
dt = '2024-10-15'
data = calculate_derive_index(date=dt)
data.head()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
结合单体温度和电压的一致性、单体电阻、基础健康分、析锂检测四个大项得到最后的健康分。
计算目标:[健康分]
'''
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from os import setegid
from battery_health import data_load, configure, udf_collections
import pyspark.sql.functions as F
from pyspark.sql import Window
from battery_health.functions.data_verification import data_check
id_dict = {'Cell': 'cell_id', 'BatteryCluster': 'batc_id'}
val_dict = {'SOC': 'soc', 'Voltage': 'vol', 'Current': 'cur', 'Temperature': 'temp'}
def data_prep(date, station):
"""
这里是完成电池健康分计算的数据预处理,写入 cell_base_data
层级:cell
数据内容:[temp、soc、col、cur、power]
时间范围:当日最大的充电、放电记录时间。
"""
# 获取当前电站充放电状态,目的是:获取最大一次的充电、放电记录
state = data_load.get_state(date=date, station=station,
dev_id=configure.general_map[station]['id']['batu']['dev_id'])
if data_check.check_state_data(state):
print('Station:',station,'is Empty!')
return 0
window = Window.partitionBy(['state', 'dt', 'dev_id'])
state = state.withColumn('charge_flag', F.max('soc_diff').over(window))\
.withColumn('discharge_flag', F.min('soc_diff').over(window))
charge_state = state.filter((F.col('soc_diff') == F.col('charge_flag'))&(F.col('state') == 'charge'))
discharge_state = state.filter((F.col('soc_diff') == F.col('discharge_flag'))&(F.col('state') == 'discharge'))
state = charge_state.unionByName(discharge_state)
state.show()
# 获取充放电对应的小时,目的是一定程度上的减少数据量
state_hour = state.groupby(['dev_id', 'state']).apply(udf_collections.get_state_hour)\
.withColumnRenamed('dev_id', 'batu_id')
state_hour.show()
# 获取对应数据
temp_data = get_data_vol(date=date, station=station, tier='Cell', state_hour=state_hour, pid_name='Temperature')
volt_data = get_data_vol(date=date, station=station, tier='Cell', state_hour=state_hour, pid_name='Voltage')
cur_data = get_data_vol(date=date, station=station, tier='BatteryCluster', state_hour=state_hour, pid_name='Current')
soc_data = get_data_vol(date=date, station=station, tier='BatteryCluster', state_hour=state_hour, pid_name='SOC')
if station == 'hnc':
cur_data = cur_data.withColumn('cur', F.col('cur')*-1)
# 根据关系结构,合并数据
rel = data_load.get_dev_rel(date=date, targets='cell_id, batc_id, batu_id',
station_id_list=configure.general_map[station]['details']['station_id'])
data = volt_data.join(rel, on=['cell_id'], how='left')\
.join(cur_data, on=['times','dt','station','batc_id'], how='full')\
.join(soc_data, on=['times','dt','station','batc_id'], how='full')\
.join(temp_data, on=['times','dt','station','cell_id'], how='full')
data = data.withColumn('power', F.col('cur') * F.col('vol')) # 计算功率
# 根据具体充放电起止时间,筛选数据
state = state.withColumnRenamed('dev_id', 'batu_id')
charge_data = data.join(
F.broadcast(state.filter(F.col('state') == 'charge').select(['batu_id', 'state', 'start_time', 'end_time'])),
on='batu_id', how='left')\
.filter((F.col('times') >= F.col('start_time')) & (F.col('times') <= F.col('end_time')))
discharge_data = data.join(
F.broadcast(state.filter(F.col('state') == 'discharge').select(['batu_id', 'state', 'start_time', 'end_time'])),
on='batu_id', how='left')\
.filter((F.col('times') >= F.col('start_time')) & (F.col('times') <= F.col('end_time')))
# 数据整合
base_data = charge_data.unionByName(discharge_data).withColumn('dt',F.lit(date))
base_data = base_data.select(['batu_id', 'batc_id', 'cell_id', 'times', 'state', 'vol', 'cur', 'power', 'soc',
'temp', 'station', 'dt', ]).drop_duplicates()
return base_data
def get_data_vol(date, station, tier, state_hour, pid_name):
"""
获取目标层级,在对应的充放电过程中的数据
"""
# 获取有数据的hour
sh = state_hour.select(['dt', 'hour','state']).toPandas()
hour_list = list(sh[sh['dt'] == max(sh.dt.unique())].hour)
hour_list = list(set(hour_list))
# 获取dev_pid数据
dev_pid = data_load.get_dev_pid(pid_name=pid_name, date=date, targets=['dev_id', 'pid'], tier=tier,
station_id_list=configure.general_map[station]['details']['station_id'])
pid_list = list(dev_pid.toPandas().pid)
# 根据pid和hour获取数据
data = data_load.get_ods_data(pid=pid_list, date=date, targets=['pid', 'val', 'times', 'dt', 'station', 'hour'],
station=station, hour=hour_list)
# 获取昨日数据(dt>1意味着存在跨天数据)
if len(sh.dt.unique())>1:
print("存在跨天数据")
yes_date = sh.dt.min()
hour_list = list(sh[sh['dt'] == yes_date].hour)
hour_list = list(set(hour_list))
extra_data = data_load.get_ods_data(pid=pid_list, date=yes_date, station=station, hour=hour_list,
targets=['pid', 'val', 'times', 'dt', 'station', 'hour'])
data = data.unionByName(extra_data)
data = data.join(state_hour, on=['dt', 'hour'], how='left') # 添加state数据
# 重采样,填补数据
freq_df = data_load.get_index('freq')
data = data.join(freq_df, on=['station'], how='left').groupby(['pid', 'state'])\
.apply(udf_collections.fill_time_by_resample)
# 数据整理,return
data = data.withColumnRenamed('val',val_dict[pid_name]).join(dev_pid, on=['pid'],how='left')\
.drop('pid').withColumnRenamed('dev_id',id_dict[tier])
return data
def get_cell_base(date):
# 分电站进行数据预处理,原因是多电站并行数据量过大,会引发长时间GC导致服务崩溃。
for i in configure.station_list:
data = data_prep(date=date, station=i)
if data != 0:
data.write.mode('overwrite').insertInto(configure.cell_base_table, overwrite=True)
print(i, "cell_base prepared")
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
基于SOC和功率(电压*电流)识别储能单元、电池单元、电池簇的充放电起止时间。顺便通过起止时间计算这几个层级的容量。
计算目标:[开始时间,截止时间,充放电状态,soc变化量,是否满充满放][容量]
'''
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pyspark.sql.functions as F
from pyspark.sql import Window
from battery_health import data_load, configure
def calculate_dev_capacity(date):
"""
计算dev_data中的容量
层级:[enu, batu, batc]
"""
data = data_load.get_data_general(configure.dev_table, date)
state = data_load.get_state(date)
state = state.select(['dev_id', 'dt', 'state', 'start_time', 'end_time'])
window = Window.partitionBy(['dev_id', 'dt', 'state', 'start_time', 'end_time'])
capacity_df = data.join(F.broadcast(state), on=['dev_id', 'dt'], how='inner')
# 计算容量
capacity_df = capacity_df.filter((F.col('times') >= F.col('start_time')) & (F.col('times') <= F.col('end_time')))\
.withColumn('power_sum', F.sum('power').over(window))\
.withColumn('capacity', F.abs(F.col('power_sum')) / (3600 / 10))
capacity_df = capacity_df.select(['dev_id', 'state', 'start_time', 'capacity', 'dt', 'station']).drop_duplicates()
return capacity_df
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health import data_load, configure, udf_collections
import pyspark.sql.functions as F
import pandas as pd
from dateutil.relativedelta import relativedelta
from battery_health.functions.data_verification import data_check
RENAME_DICT = {'SOC':'soc', 'Voltage':'vol', 'Current':'cur', 'ActivePower':'power'}
# 用于 dev_data 的计算,获取指定数据类型的所有数据(所有电站、所有层级)
def get_dev_data(pid_name, date, yesterday_hour = None, freq=None):
"""
这个方法只获取一种数据
如何描述一个从元数据表中获取数据的过程:”从元表数据中获取:A电站,B日期,C层级的D类数据“
这里虽然做成可以支持多个数据一起查询的格式,但是建议还是单个数据进行查询
:param yesterday_hour:
:param date:
:type pid_name: 支持list或者str的传入
:param freq: 默认10s
:return: ['dev_id', 'times', 'val', 'dt', 'station']
"""
# 采样频率的临时解决方案,暂定10s,要根据电站做个性化的话,得另写UDF。
# 从点号设备表重获取,想要的数据类型(pid_name)的所有点号。
dev_pid = data_load.get_dev_pid(pid_name=pid_name, date=date).select(['pid', 'dev_id'])
pid_list = list(dev_pid.toPandas()['pid'])
# 这里是对hnc的电池单元有功功率绑在PCS上的临时解决方案
if pid_name == 'ActivePower':
try:
hnc_id = [configure.general_map['hnc']['details']['station_id']]
dc_dev_pid = data_load.get_dev_pid(station_id_list = hnc_id, tier=['PCS'], pid_name=pid_name,
date=date).select(['pid', 'dev_id'])\
.replace(['628549'], ['628607'], 'dev_id')
dev_pid = dev_pid.unionByName(dc_dev_pid)
pid_list = list(dev_pid.toPandas()['pid'])
print('change hnc batc_id from 628549 to 628607 and use DcActivePower.')
except pyspark.sql.utils.ParseException:
print('Empty DcActivePower of PCS !!!')
# 获取当日基础数据
data = data_load.get_ods_data(targets=['pid', 'val', 'times', 'dt', 'station'], pid=pid_list, date=date)
last_date = str((pd.to_datetime(date[0]) - relativedelta(days=1)).date())
last_date = str(last_date).replace('[', '(').replace(']', ')')
# 如果有对昨日数据的需求,则触发这部分
if yesterday_hour is not None:
# 获取昨天的数据,对每个电站都取昨天的后n小时。
extra_data = data_load.get_ods_data(targets=['pid', 'val', 'times', 'dt', 'station'], pid=pid_list,
date=last_date, hour=yesterday_hour)
# 数据合并
data = data.unionByName(extra_data)
else :
extra_data = data_load.get_ods_data(targets=['pid', 'val', 'times', 'dt', 'station'], pid=pid_list,
date=last_date)
# 数据合并
data = data.unionByName(extra_data)
# 按预设频率重采样,修改列名方便后续join操作。
freq_df = data_load.get_index('freq')
data = data.join(freq_df, on=['station'], how='left').groupby('pid').apply(udf_collections.fill_time_by_resample)
data = data.join(dev_pid, on=['pid'], how='left').withColumnRenamed('val', RENAME_DICT[pid_name]).drop('pid')
return data
# dev_data 数据整合,把无法获取的power用cur*vol算取
def data_prep(date, yesterday_hours = None):
"""
:param date: 日期
:param yesterday_hours: 昨日的最后n个小时的数据,不输入或输入[0,23]之外的数,都可以使这个参数失效
:return: dev_data
"""
# 计算昨天的最后n小时,日期格式的统一。
if yesterday_hours is not None:
yes_range = [str(24-i) for i in range(yesterday_hours)]
else:
yes_range = None
if type(date)==type(''):
date = [date]
# 获取四个类型的数据。
soc_data = get_dev_data(pid_name='SOC', date=date, yesterday_hour=yes_range)
power_data = get_dev_data(pid_name='ActivePower', date=date, yesterday_hour=yes_range)
cur_data = get_dev_data(pid_name='Current', date=date, yesterday_hour=yes_range)
vol_data = get_dev_data(pid_name='Voltage', date=date, yesterday_hour=yes_range)
# 数据合并,计算功率。
same_columns = ['dev_id', 'times', 'dt', 'station']
data = soc_data.join(cur_data, on=same_columns, how='left').join(vol_data, on=same_columns, how='left')\
.join(power_data, on=same_columns, how='left').withColumn('dt', F.lit(date[0]))
data = data.withColumn('power',F.when(F.col('power').isNull(),F.col('vol') * F.col('cur') / 1000)
.otherwise(F.col('power'))).drop('cur').drop('vol').dropna()
data = data.withColumn('power',F.when(F.col('station')=='hnc',F.col('power')*-1).otherwise(F.col('power')))
data_check.check_dev_data(data)
data = data.select(['dev_id', 'times', 'soc', 'power', 'station', 'dt'])
return data
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health import udf_collections, configure, data_load
import pyspark.sql.functions as F
import pyspark.sql.types as t
def calculate_state(date):
data = data_load.get_data_general(configure.dev_table, date)
# 获取需要的设备ID,这里写的不够好,之后再优化。(hnc不需要batc,batu也是用的配置中的BATU_ID)
dev_list = configure.dev_id_list
# 获取其他指标:阈值、满充满放判断依据。拼接方便后续操作。
full_ind_df = data_load.get_index(index='full_charge_index')
threshold_df = data_load.get_index(index='threshold')
index_df = full_ind_df.join(threshold_df, on=['station'],how='left')
prep_data = data.join(index_df, on=['station'],how='left')
prep_data = prep_data.withColumn('power', F.when(F.abs(F.col('power')) < F.col('threshold'), 0)
.otherwise(F.col('power')))
# 目前只完成了基础的计算, 需要做个数据校验
zero_df = prep_data.filter(F.col('dev_id').isin(dev_list)).groupby(['dev_id']).apply(udf_collections.get_state)
zero_df = zero_df.withColumn('dt', F.col('end_time').astype(t.DateType())).filter(F.col('dt')==date)
zero_df = zero_df.select(['dev_id', 'start_time', 'end_time', 'state', 'soc_diff', 'full_charge', 'dt', 'station'])
return zero_df
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
基于数据,对于数据进行二次加工,探索数学层面的运行情况。
计算目标:[充电末端soc、放电末端soc、充电末端最大电压差(batu)、放电末端最大电压差(batu)、充电末端电压差(cell)、放电末端电压差(cell)、
充电末端电压标准差(batu)、放电末端电压标准差(batu)、充电末端电压偏离度(cell)、放电末端电压偏离度(cell)]
'''
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health.functions.data_verification import data_check
from battery_health import data_load, configure
from dateutil.relativedelta import relativedelta
import pyspark.sql.functions as F
from pyspark.sql import Window
import pandas as pd
def get_state_val(data, state, pid_name):
# 获取指定状态的,指定数据类型。
name_dict = {'vol_diff': 'EndVoltageDiff', 'deviation': 'EndVoltageDeviation', 'vol_range': 'EndMaxVoltDiff',
'std_dev': 'EndVoltSTD', 'soc': 'EndSOC'}
state_dict = {'charge': 'Charge', 'discharge': 'Discharge'}
data = data.select(['dev_id', 'state', pid_name, 'dt', 'station'])
data = data.filter(F.col('state')==state).withColumnRenamed(pid_name, 'val')\
.withColumn('pid_name', F.lit('{state}{pid_name}'.format(state=state_dict[state], pid_name=name_dict[pid_name])))
data = data.select(['dev_id', 'pid_name', 'val', 'dt', 'station'])
return data
def get_statistic(date, cycles):
days = [str((pd.to_datetime(date) - relativedelta(days=i)).date()) for i in range(cycles)]
data = data_load.get_data_general(configure.statistic_mid_table, date=days)
# 充电末端SOC需要超过一定范围
data = data_check.check_soc_data(data=data, col_name='soc')
# 检查电压数据正确性
data = data_check.check_cell_data(data=data, col_name='vol')
window = Window.partitionBy(['batu_id', 'state', 'dt', 'times'])
window_dt = Window.partitionBy(['batu_id', 'state', 'dt'])
analysis_data = data.withColumn('std_dev', F.stddev('vol').over(window)) \
.withColumn('max_dev', F.max('std_dev').over(window_dt)).filter(F.col('std_dev') == F.col('max_dev'))
analysis_data = analysis_data.cache()
analysis_data.count()
# 计算电池单元健康
batu_health = analysis_data.withColumn('vol_range', F.max('vol').over(window_dt) - F.min('vol').over(window_dt)) \
.withColumnRenamed('batu_id', 'dev_id')\
.select(['dev_id', 'state', 'soc', 'vol_range', 'std_dev', 'dt', 'station']).drop_duplicates()
# 计算单体电池健康
cell_health = analysis_data.withColumn('max_vol', F.max('vol').over(window_dt)) \
.withColumn('min_vol', F.min('vol').over(window_dt)) \
.withColumn('vol_diff', F.when(F.col('state') == 'charge', F.col('max_vol') - F.col('vol'))
.otherwise(F.col('vol') - F.col('min_vol'))) \
.withColumn('mean_vol', F.mean('vol').over(window_dt)) \
.withColumn('deviation', (F.col('vol') - F.col('mean_vol')) / F.col('mean_vol')) \
.withColumnRenamed('cell_id', 'dev_id') \
.select(['dev_id', 'state', 'soc', 'vol_diff', 'deviation', 'dt', 'station'])
batu_health = batu_health.cache()
batu_health.count()
cell_health = cell_health.cache()
cell_health.count()
# 电池单元和单体:充放电末端Soc
batu_charge_soc = get_state_val(data=batu_health, state='charge', pid_name='soc')
batu_discharge_soc = get_state_val(data=batu_health, state='discharge', pid_name='soc')
cell_charge_soc = get_state_val(data=cell_health, state='charge', pid_name='soc')
cell_discharge_soc = get_state_val(data=cell_health, state='discharge', pid_name='soc')
# 电池单元:充放电末端电压极差、充放电末端电压标准差
batu_diff_charge = get_state_val(batu_health, state='charge', pid_name='vol_range')
batu_diff_discharge = get_state_val(batu_health, state='discharge', pid_name='vol_range')
batu_std_charge = get_state_val(batu_health, state='charge', pid_name='std_dev')
batu_std_discharge = get_state_val(batu_health, state='discharge', pid_name='std_dev')
# 电池单体:充放电末端电压差、充放电末端电压偏离度
cell_vol_diff_charge = get_state_val(data=cell_health, state='charge', pid_name='vol_diff')
cell_vol_diff_discharge = get_state_val(data=cell_health, state='discharge', pid_name='vol_diff')
cell_vol_deviation_charge = get_state_val(data=cell_health, state='charge', pid_name='deviation')
cell_vol_deviation_discharge = get_state_val(data=cell_health, state='discharge', pid_name='deviation')
# 数据整合
soc_data = batu_charge_soc.unionByName(batu_discharge_soc).unionByName(cell_charge_soc) \
.unionByName(cell_discharge_soc)
batu_data = batu_diff_charge.unionByName(batu_diff_discharge).unionByName(batu_std_charge) \
.unionByName(batu_std_discharge)
cell_data = cell_vol_diff_charge.unionByName(cell_vol_diff_discharge).unionByName(cell_vol_deviation_charge)\
.unionByName(cell_vol_deviation_discharge)
data = soc_data.unionByName(batu_data).unionByName(cell_data)
# 处理时间为时间戳
data = data.withColumn('dt', F.lit(date))
data = data.withColumn('times', F.unix_timestamp(F.col('dt'),"yyyy-MM-dd")*1000).withColumn('dt', F.lit(date))
data = data.select(['dev_id', 'pid_name', 'times', 'val', 'dt', 'station'])
analysis_data.unpersist()
batu_health.unpersist()
cell_health.unpersist()
return data
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health import data_load, configure, udf_collections
from battery_health.functions import data_verification
from battery_health.functions.data_verification import data_check
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
def get_tier_data(date, pid_name, tier, state, station_list=None):
print("-----------------------------------pid_name:", pid_name)
if station_list is None:
station_list = configure.station_list
# 状态-小时表
state_hour = state.withColumn('hour', F.substring(F.col('end_time'), 12, 2)).select(['station', 'hour']).dropDuplicates()
station_id = [configure.general_map[i]['details']['station_id'] for i in station_list]
# 点号设备信息
dev_pid = data_load.get_dev_pid(pid_name=pid_name, targets=['dev_id', 'pid'], date=date, tier=tier, station_id_list=station_id)
pid = list(dev_pid.toPandas().pid.unique())
# 先获取全部数据,再根据'hour'筛选出对应小时的数据。
data = data_load.get_ods_data(pid=pid, date=date, targets=['pid', 'val', 'times', 'dt', 'station', 'hour'], station=station_list)
data = data.join(F.broadcast(state_hour), on=['station','hour'], how='inner')
# 数据修补,防止填补数据填补不到起止时刻的情况,把起止时刻拼进数据中
state_data = state.withColumnRenamed('end_time', 'times').join(dev_pid, on=['dev_id'], how='left') \
.select(['pid', 'times', 'dt', 'station']).withColumn('val', F.lit(None).cast('double')) \
.withColumn('hour', F.substring(F.col('times'), 12, 2))
# unionByName 前先过滤掉 state_data 里的空主键行
state_data = state_data.filter(
(F.col('pid').isNotNull()) & (F.col('dt').isNotNull()) & (F.col('hour').isNotNull())
)
# union
data = data.unionByName(state_data)
# 模拟数据代码开始-------------------------------------------------------------
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, DoubleType
#
# # 初始化 Spark
# spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
#
# # 定义 schema
# schema = StructType([
# StructField("station", StringType(), True),
# StructField("hour", StringType(), True),
# StructField("pid", StringType(), True),
# StructField("val", DoubleType(), True),
# StructField("times", StringType(), True),
# StructField("dt", StringType(), True),
# ])
#
# # 构造数据
# data_list = [
# ("hnc", "16", "111510044001206", 3.197, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001206", 3.199, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001209", 3.194, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001217", 3.194, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001221", 3.197, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001226", 3.200, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001221", None, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001231", 3.201, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001231", None, "2024-10-15 16:00:03", "2024-10-15"),
# ("hnc", "16", "111510044001231", 3.201, "2024-10-15 16:00:03", "2024-10-15"),
# ]
#
# # 创建 DataFrame
# df = spark.createDataFrame(data_list, schema)
# df.show()
# df = df.groupby(['pid', 'dt', 'hour']).apply(udf_collections.statistic_data_adjustment)
# df.show()
# data = df
# 模拟数据代码结束-------------------------------------------------------------
# 处理起止时刻是时间序列中最后时刻的情况
data = data.groupby(['pid', 'dt', 'hour']).apply(udf_collections.statistic_data_adjustment)
# 重采样,补数据
freq = data_load.get_index('freq')
data = data.join(F.broadcast(freq), on=['station'], how='left').groupby(['pid', 'dt', 'hour'])\
.apply(udf_collections.fill_time_by_resample)
# 数据整理
data = data.join(dev_pid,on=['pid'], how='left').drop('pid')
s = state.select(['dev_id', 'end_time', 'state', 'dt', 'station']).withColumnRenamed('end_time' ,'times')
data = data.join(F.broadcast(s), on=['times', 'dt', 'station', 'dev_id'], how='right')
return data
def data_prep_station(date):
batu_id = data_load.get_id_by_tier(tier='batu')
state = data_load.get_state(date=date, dev_id=batu_id) # 电池单元状态表
state = state.cache()
state.count()
rel = data_load.get_dev_rel(date=date, targets=['batu_id', 'cell_id']) # 关系表
cell_state = state.join(rel.withColumnRenamed('batu_id', 'dev_id'), on=['dev_id'], how='full') \
.drop('dev_id').withColumnRenamed('cell_id', 'dev_id').dropna() # 单体状态表
# Volt,所有Cell都可以获取到电压数据。
vol_data = get_tier_data(date=date, pid_name='Voltage', tier='Cell', state=cell_state) \
.withColumnRenamed('dev_id', 'cell_id').withColumnRenamed('val', 'vol')
vol_data = data_verification.data_check.check_cell_data(data=vol_data, col_name='vol')
vol_data = vol_data.join(rel, on=['cell_id'], how='left')
soc_data = get_tier_data(date=date, pid_name='SOC', tier='BatteryUnit', state=state,
station_list=configure.station_list) \
.withColumnRenamed('dev_id', 'batu_id').withColumnRenamed('val', 'soc')
soc_data = soc_data.join(rel, on=['batu_id'], how='left')
'''
# cell_SOC,假设所有Cell都能获取到SOC数据。
cell_soc_data = get_tier_data(date=date, pid_name='SOC', tier='Cell', state=cell_state,
station_list=configure.station_list) \
.withColumnRenamed('dev_id', 'cell_id').withColumnRenamed('val', 'cell_soc')
cell_soc_data = cell_soc_data.join(rel, on=['cell_id'], how='left')
# batu_SOC,再获取一遍电池单元层级的SOC数据,后续计算要用到
batu_soc_data = get_tier_data(date=date, pid_name='SOC', tier='BatteryUnit', state=state,
station_list=configure.station_list) \
.withColumnRenamed('dev_id', 'batu_id').withColumnRenamed('val', 'batu_soc')
batu_soc_data = batu_soc_data.join(rel, on=['batu_id'], how='left')
soc_data = batu_soc_data.join(cell_soc_data, on=['batu_id', 'cell_id', 'times', 'dt', 'station', 'state'],
how='left')
'''
# 数据整合,写表
data = soc_data.join(vol_data, on=['batu_id', 'cell_id', 'times', 'dt', 'station', 'state'], how='left')
# data = data.select(['batu_id', 'cell_id', 'soc', 'vol', 'state', 'times', 'station', 'dt']) 这里dt和station位置换一下
data = data.select(['batu_id', 'cell_id', 'soc', 'vol', 'state', 'times', 'dt', 'station'])
return data
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health.functions.statistic_data import statistic_calculation, statistic_data_prep
def data_prep(date):
data = statistic_data_prep.data_prep_station(date=date)
return data
def statistic_summary(date):
data = statistic_calculation.get_statistic(date=date, cycles=1)
return data
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from battery_health import data_load, configure, udf_collections
import pyspark.sql.functions as F
import time
import pyspark.sql.types as t
import pandas as pd
from dateutil.relativedelta import relativedelta
from pyspark.sql import Window
from battery_health.functions.data_verification import data_check
import time
import requests
def empty_pids(date, pid_name_list):
struct_time = time.strptime(date, '%Y-%m-%d')
date_s = str(int(time.mktime(struct_time))*1000000000)
dev_pid = data_load.get_dev_pid(pid_name=pid_name_list, date=date, station_id_list='628421',
tier=['BatteryCluster', 'BatteryUnit', 'Storage', 'Cell'])
dp = dev_pid.toPandas()
for i in range(10):
print('round:', i)
dp_ = dp[i::10]
dp_list = dp_.pid.to_list()
#dp_list = ['11159999938769','11159999938775','11159999938774','11159999952235']
s = str(dp_list).replace('[', '%28pid%3D%27').replace(''',''','%27+or+pid%3D%27').replace("'", '')\
.replace(' ', '').replace(']', '%27%29+and+time%3D')+date_s
headers = {
'Accept': 'application/csv',
}
response = requests.get(
# 'https://ts-bp1ue44w0814t8cx9.influxdata.tsdb.aliyuncs.com:8086/query?db=sgool&u=huafeng&p=Huafeng@2022&precision=ms&q=delete+from+data_meas_1114_1d+where+{}'.format(s),
'http://192.168.1.99:48088/query?db=sgool&u=yunhe&p=yunhe2020&precision=ms&q=delete+from+data_meas_1114_1d+where+{}'.format(s),
headers=headers,
)
print('round:', i, 'xmei delete finished')
response = requests.get(
# 'https://ts-bp1ue44w0814t8cx9.influxdata.tsdb.aliyuncs.com:8086/query?db=sgool&u=huafeng&p=Huafeng@2022&precision=ms&q=delete+from+data_meas_1115_1d+where+{}'.format(s),
'http://192.168.1.99:48088/query?db=sgool&u=yunhe&p=yunhe2020&precision=ms&q=delete+from+data_meas_1115_1d+where+{}'.format(s),
headers=headers,
)
print('round:', i, 'hnc delete finished')
print('x')
def cal_batu_temp_cons(date):
batu_window = Window.partitionBy(['batu_id', 'dt', 'times'])
batu_temp = data_load.get_cell_base(date=date, targets=['batu_id', 'temp', 'dt', 'times', 'station'])
batu_temp = data_check.check_cell_data(batu_temp, 'temp')
batu_temp_cons = batu_temp.withColumn('temp_range', F.max('temp').over(batu_window) -
F.min('temp').over(batu_window))
window_temp = Window.partitionBy(['batu_id', 'dt'])
batu_temp_cons = batu_temp_cons.withColumn('temp_range', F.max('temp_range').over(window_temp))
batu_temp_cons = batu_temp_cons.select(['batu_id', 'temp_range', 'dt', 'station']).dropDuplicates()
batu_temp_cons.write.format('hive').insertInto(configure.batu_tempCons_table, overwrite=True)
def sava_to_hive(date):
# create_risk_data(date=date)
# save_data = create_capacity_soh(date=date)
cal_batu_temp_cons(date)
save_data = data_load.get_data_general(configure.soh_table, date=date)
# save_data = spark.sql("select * from test.decay_data")
# health_data = create_health_data(date=date)
health_data = data_load.get_data_general(configure.health_data, date=date)
health_score = data_load.get_data_general(configure.health_score, date=date)
health_score = health_score.withColumn('times', F.lit(str(int(time.mktime(time.strptime(date, "%Y-%m-%d"))) * 1000)))
# days = [str((pd.to_datetime(date) - relativedelta(days=i)).date()) for i in range(365)]
batu_temp_cons = data_load.get_data_general(table=configure.batu_tempCons_table, date=date)\
.withColumnRenamed('batu_id', 'dev_id').withColumnRenamed('dt', 'times').withColumnRenamed('temp_range', 'val')
batu_temp_cons = batu_temp_cons.withColumn('pid_name', F.lit('MaxCellTempRange'))\
.withColumn('times', F.unix_timestamp('times', 'yyyy-MM-dd'))\
.withColumn('times', F.col('times') * 1000).withColumn('dt', F.lit(date))\
.select(['dev_id', 'pid_name', 'times', 'val', 'dt', 'station'])
influxdb_data = save_data.unionByName(health_data).unionByName(health_score).unionByName(batu_temp_cons)
pid_data = data_load.get_dev_pid(targets=['pid', 'dev_id', 'pid_name'], date=date,
tier=['BatteryCluster', 'BatteryUnit', 'Storage', 'Cell'])
hive_data = influxdb_data.join(pid_data, on=['dev_id', 'pid_name'], how='left')\
.filter(F.col('station').isin(configure.station_list))
hive_data = hive_data.withColumn('dt', F.lit(date)).select(['pid', 'times', 'val', 'dt', 'station']).dropna()
# 清空要写入的点号当天0点的数据
empty_pids(date, configure.pid_name_list)
return hive_data
----------------------------------------------------------------
Wed Mar 05 10:55:38 GMT+08:00 2025:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.12.1.1 - (1704137): instance a816c00e-0195-643a-eecd-000032a852b0
on database directory C:\project\other\algorithm\algorithm\huafon_battery_health\metastore_db with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@20f2ae70
Loaded from file:/C:/tools/spark-2.4.8-bin/jars/derby-10.12.1.1.jar
java.vendor=Oracle Corporation
java.runtime.version=1.8.0_271-b09
user.dir=C:\project\other\algorithm\algorithm\huafon_battery_health
os.name=Windows 10
os.arch=amd64
os.version=10.0
derby.system.home=null
Database Class Loader started - derby.database.classpath=''
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAzxmx71w1rsEiE7Ub1pUcaIOvzKCRwHoj4ao80DKz+Cq3nR32
NtmhxjxwlMKURjdp/YN+093KrVyC7ry+xw+l+4v+9hwuDa4cu0xJPiwjluuMdf1l
Coy1NYBI1AdxUqhKd9d6WkZ+5t8E/bqBReVKMLEtNNAhbsDiX5G8ZsmI37aLWgp7
EonknLbDjX6aInMFWjj6wh2NpzL+jltszNcT9YMtlOyBAWFgdPAP2cWlcKzW7FCB
DtrzJz8EGG6HVyD3mKTfOrjrsFzWdxMKR2SxiN3IuO6/fVfGELO+aKqAZcIoiA2K
7oNpKXrI9DXlF2YBIxVOT/tge0Z9Xyn+YVclbwIDAQABAoIBAGOPhLz/ZUdXzZwP
ywxDrt6HwDHtV2Ri03EfOBEW7vH1B0RfLQh9Y4o7cBvz1vhSl3+qgYQ08cSZ3tAK
qORln3Pof2+taG5nL6TTLbrX2VwqbOIkKTkDH/JSc7CKy23jPbmTO1y+2y7+NBYO
XStO6XWzF3p2PDxUOf+bvijj5gs1hu+HtYijq/SWyI/le2gD7AbW0xM7veGHkNZA
yc5LAN86omeaGwJLV7VzCP4ALRLL2XFFpwA++ZvbEF4PCW7lCOTxBqpOzoTGfmN5
O2Lj9RbjHNJvMFgmVsy2hK1SFf0Z9fI1s6BWoQ4yI9ZXg4F5hgmr5rtGmD6HeGPM
fFoR6gECgYEA8Z1Aic+eHBKLBaGRVtwRa3AtRRaoCnfYFD3x8PD3cDpc4G5RUUfI
Ni/K7Ao7KEJImC9yHNcJov0k8oSGoU7R9fEGaCJjwRf6f6FmNkNCczvgHNHvmTkR
r4xRt41XZTnrrmLnN+0qVIA+O2auY4AqzFJWnjIq8cRqUy99If2zDYECgYEA225f
hefClSLXgRbL1yVMyJSfzfbfaQsSSYNguWA6+ex9xDw7Ye/ba36idokVfX+SzUgE
G4dZfyOUa2+bMtWqxuAj41j3eTAYFFpE4XA5QTgTCzN9THYstm70HF/LqSU9iBpt
TzPFuQQFPBVTrvSE6HFedphVwi+XDwEkUeDmiu8CgYEAhFobzGIKvl36Aa+rqeUL
NctRQRNUIcfcedok+lNFeBjAX8COkvO7XNN5WSuRlFAa7CKxY9L32GzLHH40MZC3
uv25ALo14sR72AZVs0vMzsrxzVfC5DA62+sFqIKoaS79R52uAxjLo1ZMwMVSqfa/
ewVvpWDd3Wo2xDKzXTdYKgECgYBPE+200hrbqBzF1rNLK5QKTRVyIl/M+UJz37bB
154pZ0LDr3kvCEOo75AY67ok6g67kBJ64UItgWMBfM0PetT6qtgEHJHCyMREwWtF
Wy4nBNBIHxwuq//dFws+Fn/MyzDrlaqC+oNs87f3OTBZQqGLKyAB2VA+lOv5ak3u
fABZXQKBgBOzFEgWzHBk8elTbSotQ0EiTzbCXhYlOpDRB1PTGS0VRsZ1ywoFJ+5x
LpzmP199dyolaGOVjV0W/iZbDdxSadqH7bCOMa8in+5Hz4qOMmtQD5JpVy7LLNw2
N8LC2PprTcNmYWYEUP+ikQtOKcLu4jodjL6R9UEXTenO3paNVSy0
-----END RSA PRIVATE KEY-----
# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY AND SUBDIRECTORIES CONSTITUTE A DERBY ***
# *** DATABASE, WHICH INCLUDES THE DATA (USER AND SYSTEM) AND THE ***
# *** FILES NECESSARY FOR DATABASE RECOVERY. ***
# *** EDITING, ADDING, OR DELETING ANY OF THESE FILES MAY CAUSE DATA ***
# *** CORRUPTION AND LEAVE THE DATABASE IN A NON-RECOVERABLE STATE. ***
# *************************************************************************
\ No newline at end of file
# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY ARE USED BY THE DERBY DATABASE RECOVERY ***
# *** SYSTEM. EDITING, ADDING, OR DELETING FILES IN THIS DIRECTORY ***
# *** WILL CAUSE THE DERBY RECOVERY SYSTEM TO FAIL, LEADING TO ***
# *** NON-RECOVERABLE CORRUPT DATABASES. ***
# *************************************************************************
\ No newline at end of file
# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY ARE USED BY THE DERBY DATABASE TO STORE ***
# *** USER AND SYSTEM DATA. EDITING, ADDING, OR DELETING FILES IN THIS ***
# *** DIRECTORY WILL CORRUPT THE ASSOCIATED DERBY DATABASE AND MAKE ***
# *** IT NON-RECOVERABLE. ***
# *************************************************************************
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论