1
+ package edu .harvard .hms .dbmi .avillach .hpds .etl .phenotype .csv ;
2
+
3
+
4
+ import edu .harvard .hms .dbmi .avillach .hpds .crypto .Crypto ;
5
+ import edu .harvard .hms .dbmi .avillach .hpds .data .phenotype .PhenoCube ;
6
+ import edu .harvard .hms .dbmi .avillach .hpds .etl .LoadingStore ;
7
+ import org .apache .commons .csv .CSVFormat ;
8
+ import org .apache .commons .csv .CSVRecord ;
9
+ import org .slf4j .Logger ;
10
+ import org .slf4j .LoggerFactory ;
11
+ import org .springframework .beans .factory .annotation .Value ;
12
+ import org .springframework .stereotype .Service ;
13
+
14
+ import java .io .*;
15
+ import java .util .Date ;
16
+
17
+ @ Service
18
+ public class CSVLoaderService {
19
+
20
+ private static final Logger log = LoggerFactory .getLogger (CSVLoaderService .class );
21
+ private final LoadingStore store = new LoadingStore ();
22
+
23
+ @ Value ("${etl.hpds.directory:/opt/local/hpds/}" )
24
+ private String hpdsDirectory ;
25
+
26
+ @ Value ("${etl.rollup.enabled:true}" )
27
+ private boolean rollupEnabled ;
28
+
29
+ public void runEtlProcess () throws IOException {
30
+ log .info ("Starting ETL process... Rollup Enabled: {}" , rollupEnabled );
31
+
32
+ store .allObservationsStore = new RandomAccessFile (hpdsDirectory + "allObservationsStore.javabin" , "rw" );
33
+ initialLoad ();
34
+ store .saveStore (hpdsDirectory );
35
+
36
+ log .info ("ETL process completed." );
37
+ }
38
+
39
+ private void initialLoad () throws IOException {
40
+ Crypto .loadDefaultKey ();
41
+ Reader in = new FileReader (hpdsDirectory + "allConcepts.csv" );
42
+ Iterable <CSVRecord > records = CSVFormat .DEFAULT
43
+ .withFirstRecordAsHeader ()
44
+ .withSkipHeaderRecord (true )
45
+ .parse (new BufferedReader (in , 256 * 1024 ));
46
+
47
+ final PhenoCube [] currentConcept = new PhenoCube [1 ];
48
+ for (CSVRecord record : records ) {
49
+ processRecord (currentConcept , record );
50
+ }
51
+
52
+ }
53
+
54
+ private void processRecord (final PhenoCube [] currentConcept , CSVRecord record ) {
55
+ if (record .size () < 4 ) {
56
+ log .warn ("Skipping record #{} due to missing fields." , record .getRecordNumber ());
57
+ return ;
58
+ }
59
+
60
+ String conceptPath = CSVParserUtil .parseConceptPath (record , rollupEnabled );
61
+ String numericValue = record .get (CSVParserUtil .NUMERIC_VALUE );
62
+ boolean isAlpha = (numericValue == null || numericValue .isEmpty ());
63
+ String value = isAlpha ? record .get (CSVParserUtil .TEXT_VALUE ) : numericValue ;
64
+ currentConcept [0 ] = getPhenoCube (currentConcept [0 ], conceptPath , isAlpha );
65
+
66
+ if (value != null && !value .trim ().isEmpty () &&
67
+ ((isAlpha && currentConcept [0 ].vType == String .class ) || (!isAlpha && currentConcept [0 ].vType == Double .class ))) {
68
+ value = value .trim ();
69
+ currentConcept [0 ].setColumnWidth (isAlpha ? Math .max (currentConcept [0 ].getColumnWidth (), value .getBytes ().length ) : Double .BYTES );
70
+ int patientId = Integer .parseInt (record .get (CSVParserUtil .PATIENT_NUM ));
71
+ Date date = null ;
72
+ if (record .size () > 4 && record .get (CSVParserUtil .DATETIME ) != null && !record .get (CSVParserUtil .DATETIME ).isEmpty ()) {
73
+ date = new Date (Long .parseLong (record .get (CSVParserUtil .DATETIME )));
74
+ }
75
+ currentConcept [0 ].add (patientId , isAlpha ? value : Double .parseDouble (value ), date );
76
+ store .allIds .add (patientId );
77
+ }
78
+ }
79
+
80
+ private PhenoCube getPhenoCube (PhenoCube currentConcept , String conceptPath , boolean isAlpha ) {
81
+ if (currentConcept == null || !currentConcept .name .equals (conceptPath )) {
82
+ currentConcept = store .store .getIfPresent (conceptPath );
83
+ if (currentConcept == null ) {
84
+ log .info ("Writing - " + conceptPath );
85
+ // safe to invalidate and write store?
86
+ store .store .invalidateAll (); // force onremoval to free up cache per concept
87
+ store .store .cleanUp ();
88
+ currentConcept = new PhenoCube (conceptPath , isAlpha ? String .class : Double .class );
89
+ store .store .put (conceptPath , currentConcept );
90
+ }
91
+ }
92
+ return currentConcept ;
93
+ }
94
+ }
0 commit comments