我有一个管道,我得到4000k HL7文件。我必须将其转换为csv格式的文件,每个文件将有许多HL7段,每个段(OBX)将有一列(COL1, COl2...COL100),它的价值是什么?每个文件将有许多HL7段,每个段(OBX)将有一列(COL1, COl2...COL100)和它的值和时间。每个文件可能有50到100个OBX即列。我正在循环通过每个列,并创建pandas数据框架和附加列。这需要很多时间。我观察到最后的合并(函数process_hl7msg)需要很多时间。
def parse_segments():
df_num = pd.DateFrame()
for each segment in segments:
obx_timestamp = get obx_timestamp from segment
observation_value = get obx_timestamp from segment
device = get device info from segment
df = pd.DataFrame()
df=df.append({"Time": obx_timestamp, obs_identifier: observation_value, "device": device}, ignore_index=True)
if df_num.empty:
df_num = df
else:
df_num = pd.merge(df_num, df, on=["Time", "device"])
return df_num
def process_hl7msg():
df_list = []
for file_name in file_list:
segments = get segments
df_list.append(parse_segments(segments))
for df1 in df_list:
if df.empty:
df = df1
else:
df = pd.merge(df, df1, on=["Time", "device"], how='outer')
下面是每个解析的hl7文件和预期输出的例子。
File 1
Time EVENT device COL1 COL2
20200420232613.6200+0530 start device1 1.0 2.3
20200420232614.6200+0530 device1 4.4 1.7
File 2
Time EVENT device COL3 COL4 COL5
20200420232613.6200+0530 device1 44 66 7
20200420232614.6200+0530 device2 1.0 2.3 0.5
20200420232615.6200+0530 pause device3 4.4 1.7 0.9
File 3
20200420232613.6200+0530 device2 1.0 2.3
...
File 4000
**Expected Output:**
Time EVENT device COL1 COL2 COL3 COL4 COL5
20200420232613.6200+0530 start device1 1.0 2.3 44 66 7
20200420232613.6200+0530 device2 1.0 2.3
20200420232614.6200+0530 end device1 4.4 1.7
20200420232615.6200+0530 pause device2 1.0 2.3 0.5
20200420232616.6200+0530 device3 4.4 1.7 0.9
任何优化的建议都将感激不尽
更新1: 增加了事件栏。
obx_timestamp =20200420232616.6200+0530
obs_identifier= any one or more value from the list (COL1, COL2, ......COl10)
observation_value any numeric value
device it can be any one of from the list (device1,device2, device3, device4, device5)
UPDATE2: 添加了事件栏
t3=[{'Time': 100, 'device': 'device1', 'EVENT':'' 'event','obx_idx': 'MDC1','value':1.2},
{'Time': 100, 'device': 'device1', 'obx_idx': 'COL2','value':4.5},
{'Time': 100, 'device': 'device1', 'obx_idx': 'COL4','value':4.5},
{'Time': 200, 'device': 'device3', 'obx_idx': 'COL2','value':2.5},
{'Time': 200, 'device': 'device3', 'obx_idx': 'COl3','value':2.5}]
df=pd.DataFrame.from_records(t3, index=['Time','device','EVENT','obx_idx'])['value'].unstack()
试着在两个数据框上设置索引,然后做一个连接。
df.set_index(["Time", "device"], inplace=True)
df1.set_index(["Time", "device"], inplace=True)
df.join(df1, how = 'outer')
然而,根据预期的输出,你也可以尝试做一个... ... concat
关于 axis = 1
:
df.set_index(["Time", "device"], inplace=True)
df1.set_index(["Time", "device"], inplace=True)
df_f = pd.concat([df, df1], axis=1)
这里是如何改变你的函数,想法是不要在每个循环中创建数据帧。parse_segment
但只在最后使用 from_records
指定索引级别,以便能够使用 unstack
就在之后。并使用 pd.concat
与轴=1在 process_hl7msg
试试
def parse_segments():
l_seg = []
for each segment in segments:
obx_timestamp = get obx_timestamp from segment
obs_identifier = get ...
observation_value = get obx_timestamp from segment
device = get device info from segment
# append a dictionary to a list
l_seg.append({'time': obx_timestamp, 'device':device,
'obs_idx':obs_identifier, 'value':observation_value})
# create the dataframe with from_records and specify the index
return pd.DataFrame.from_records(l_seg, index=['time','device','obs_idx'])['value']\
.unstack()
def process_hl7msg():
df_list = []
for file_name in file_list:
segments = get segments
df_list.append(parse_segments(segments))
#use concat
return pd.concat(df_list, axis=1).reset_index()
如果不是太大(不知道这个数据源),你甚至可以一次做完。
def process_hl7msg():
l_values = []
for file_name in file_list:
segments = get segments
# process segments
for each segment in segments:
obx_timestamp = get obx_timestamp from segment
obs_identifier = get ...
observation_value = get obx_timestamp from segment
device = get device info from segment
# append a dictionary to a list
l_values.append({'time': obx_timestamp, 'device':device,
'obs_idx':obs_identifier, 'value':observation_value})
#return all at once
return pd.DataFrame.from_records(l_values, index=['time','device','obs_idx'])['value']\
.unstack()