Skip to content

Commit d539fb5

Browse files
authored
ALS-7737: Better support for splitting genomic data by chromosome (#137)
1 parent d09075d commit d539fb5

26 files changed

+951
-221
lines changed

data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ public Set<String> filterVariantSetForPatientSet(Set<String> variantSet, Collect
159159
String bucketKey = variantSpec.split(",")[0] + ":" + (Integer.parseInt(variantSpec.split(",")[1])/1000);
160160

161161
//testBit uses inverted indexes include +2 offset for bookends
162-
return _bucketMask.testBit(maxIndex - Collections.binarySearch(bucketList, bucketKey) + 2);
162+
int bucketKeyIndex = Collections.binarySearch(bucketList, bucketKey);
163+
if (bucketKeyIndex < 0) {
164+
return false;
165+
}
166+
return _bucketMask.testBit(maxIndex - bucketKeyIndex + 2);
163167
}).collect(Collectors.toSet());
164168
}
165169

data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class VariantMetadataIndex implements Serializable {
3434
private final Map<String, FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> > indexMap = new HashMap<>();
3535

3636
public static final String VARIANT_METADATA_STORAGE_FILE_PREFIX = "VariantMetadataStorage";
37-
private static String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX;
37+
private String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX;
3838

3939
/**
4040
* This map allows us to load millions of variants without re-writing the fbbis each time (which would blow up the disk space).

docker/pic-sure-hpds-etl/Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoader-jar-with-dependencies.
2121
COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoaderNewSearch-jar-with-dependencies.jar .
2222
COPY --from=build /app/docker/pic-sure-hpds-etl/CSVDumper-jar-with-dependencies.jar .
2323
COPY --from=build /app/docker/pic-sure-hpds-etl/VCFLocalLoader-jar-with-dependencies.jar .
24+
COPY --from=build /app/docker/pic-sure-hpds-etl/SplitChromosomeVcfLoader-jar-with-dependencies.jar .
2425
COPY --from=build /app/docker/pic-sure-hpds-etl/VariantMetadataLoader-jar-with-dependencies.jar .
2526
COPY --from=build /app/docker/pic-sure-hpds-etl/UnifiedVCFLocalLoader-jar-with-dependencies.jar .
2627
COPY --from=build /app/docker/pic-sure-hpds-etl/MultialleleCounter-jar-with-dependencies.jar .

etl/pom.xml

+21-1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,26 @@
110110
<goal>single</goal>
111111
</goals>
112112
</execution>
113+
<execution>
114+
<id>buildSplitChromosomeVcfLoader</id>
115+
<configuration>
116+
<archive>
117+
<manifest>
118+
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader</mainClass>
119+
</manifest>
120+
</archive>
121+
<outputDirectory>${project.basedir}/../docker/pic-sure-hpds-etl</outputDirectory>
122+
<descriptorRefs>
123+
<descriptorRef>jar-with-dependencies</descriptorRef>
124+
</descriptorRefs>
125+
<classifier>SplitChromosomeVcfLoader</classifier>
126+
<finalName>SplitChromosomeVcfLoader</finalName>
127+
</configuration>
128+
<phase>package</phase>
129+
<goals>
130+
<goal>single</goal>
131+
</goals>
132+
</execution>
113133
<execution>
114134
<id>buildPerPatientUnifiedVCFLocalLoader</id>
115135
<configuration>
@@ -275,7 +295,7 @@
275295
<configuration>
276296
<archive>
277297
<manifest>
278-
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader</mainClass>
298+
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVariantMetadataLoader</mainClass>
279299
</manifest>
280300
</archive>
281301
<outputDirectory>${project.basedir}/../docker/pic-sure-hpds-etl</outputDirectory>

etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ConvertMergedInfoStoresToFBBIS.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util;
2+
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader;
3+
24
import java.io.FileNotFoundException;
35
import java.io.IOException;
46

5-
import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed;
67

78
public class ConvertMergedInfoStoresToFBBIS {
89

9-
public static void main(String[] args) throws FileNotFoundException, IOException {
10+
public static void main(String[] args) throws IOException {
1011

11-
convertInfoStoresToByteIndexed();
12+
new NewVCFLoader().convertInfoStoresToByteIndexed();
1213

1314
}
1415

etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ReSplitMergeInfoStores.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util;
22

3-
import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.splitInfoStoresByColumn;
3+
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader;
44

5-
import java.io.FileNotFoundException;
65
import java.io.IOException;
76

8-
import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed;
97

108
public class ReSplitMergeInfoStores {
119

12-
public static void main(String[] args) throws FileNotFoundException, IOException {
10+
public static void main(String[] args) throws IOException {
11+
12+
NewVCFLoader newVCFLoader = new NewVCFLoader();
1313

14-
splitInfoStoresByColumn();
14+
newVCFLoader.splitInfoStoresByColumn();
1515

16-
convertInfoStoresToByteIndexed();
16+
newVCFLoader.convertInfoStoresToByteIndexed();
1717

1818
}
1919

etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java

+74-47
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,77 @@
2222

2323
public class NewVCFLoader {
2424

25+
public static final String DEFAULT_VCF_INDEX_FILE = "/opt/local/hpds/vcfIndex.tsv";
26+
public static final String DEFAULT_STORAGE_DIR = "/opt/local/hpds/all";
27+
public static final String DEFAULT_MERGED_DIR = "/opt/local/hpds/merged";
2528
private static Logger logger = LoggerFactory.getLogger(NewVCFLoader.class);
26-
private static File storageDir = null;
27-
private static String storageDirStr = "/opt/local/hpds/all";
28-
private static String mergedDirStr = "/opt/local/hpds/merged";
2929

30-
private static VariantIndexBuilder variantIndexBuilder = new VariantIndexBuilder();
30+
protected File indexFile;
31+
protected File storageDir;
32+
protected String storageDirStr;
33+
protected String mergedDirStr;
34+
35+
protected VariantIndexBuilder variantIndexBuilder;
3136

3237
// DO NOT CHANGE THIS unless you want to reload all the data everywhere.
33-
private static int CHUNK_SIZE = 1000;
38+
protected static int CHUNK_SIZE = 1000;
3439

3540
/**
3641
* @param args - if testing, this should be an array ['vcfIndexFile path', 'output storage dir', 'merged dir'].
3742
* by default this will be [/opt/local/hpds/vcfIndex.tsv, "/opt/local/hpds/all", "/opt/local/hpds/merged" ].
3843
*/
39-
public static void main(String[] args) throws FileNotFoundException, IOException {
40-
41-
File indexFile;
44+
public static void main(String[] args) throws IOException {
45+
46+
NewVCFLoader vcfLoader;
4247
if(args != null && args.length >= 3) {
43-
logger.info("Reading parameters from input - this is a test");
44-
indexFile = new File(args[0]);
45-
storageDirStr = args[1];
46-
storageDir = new File(args[1]);
47-
mergedDirStr = args[2];
48+
logger.info("Reading parameters from input");
49+
vcfLoader = new NewVCFLoader(new File(args[0]), args[1], args[2]);
4850
} else {
49-
indexFile = new File("/opt/local/hpds/vcfIndex.tsv");
50-
storageDir = new File(storageDirStr);
51+
logger.info(args.length + " arguments provided");
52+
logger.info("Using default values");
53+
vcfLoader = new NewVCFLoader();
5154
}
52-
loadVCFs(indexFile);
55+
vcfLoader.loadAndMerge();
5356
}
5457

55-
private static ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);
58+
protected void loadAndMerge() throws IOException {
59+
createWalkers();
60+
loadVCFs();
61+
}
5662

57-
private static ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();
63+
public NewVCFLoader() {
64+
this.indexFile = new File(DEFAULT_VCF_INDEX_FILE);
65+
this.storageDirStr = DEFAULT_STORAGE_DIR;
66+
this.storageDir = new File(DEFAULT_STORAGE_DIR);
67+
this.mergedDirStr = DEFAULT_MERGED_DIR;
68+
this.variantIndexBuilder = new VariantIndexBuilder();
69+
}
5870

59-
private static HashMap<String, char[][]> zygosityMaskStrings;
71+
public NewVCFLoader(File indexFile, String storageDir, String mergedDirStr) {
72+
this.indexFile = indexFile;
73+
this.storageDirStr = storageDir;
74+
this.storageDir = new File(storageDir);
75+
this.mergedDirStr = mergedDirStr;
76+
this.variantIndexBuilder = new VariantIndexBuilder();
77+
}
6078

61-
private static TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();
79+
protected ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);
6280

63-
private static long startTime;
81+
protected ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();
6482

65-
private static List<VCFWalker> walkers = new ArrayList<>();
83+
protected HashMap<String, char[][]> zygosityMaskStrings;
6684

67-
private static boolean contigIsHemizygous;
85+
protected TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();
6886

69-
private static void loadVCFs(File indexFile) throws IOException {
87+
protected long startTime;
88+
89+
protected List<VCFWalker> walkers = new ArrayList<>();
90+
91+
private boolean contigIsHemizygous;
92+
93+
protected void loadVCFs() throws IOException {
7094
startTime = System.currentTimeMillis();
71-
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
72-
for (VCFIndexLine line : vcfIndexLines) {
73-
walkers.add(new VCFWalker(line));
74-
}
75-
TreeSet<Integer> allPatientIds = new TreeSet<Integer>();
95+
TreeSet<Integer> allPatientIds = new TreeSet<>();
7696

7797
// Pull the INFO columns out of the headers for each walker and add all patient ids
7898
walkers.stream().forEach(walker -> {
@@ -232,7 +252,14 @@ private static void loadVCFs(File indexFile) throws IOException {
232252
saveVariantStore(store, variantMaskStorage);
233253
}
234254

235-
private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
255+
private void createWalkers() {
256+
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
257+
for (VCFIndexLine line : vcfIndexLines) {
258+
walkers.add(new VCFWalker(line));
259+
}
260+
}
261+
262+
protected String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
236263
StringBuilder idList = new StringBuilder();
237264
if (variantMask != null) {
238265
if (variantMask instanceof VariantMaskBitmaskImpl) {
@@ -251,7 +278,7 @@ private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMa
251278
return idList.toString();
252279
}
253280

254-
private static void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
281+
protected void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
255282
String currentContig, boolean isLastChunk, String currentLine) throws IOException, FileNotFoundException {
256283
if (!currentContig.contentEquals(lastContigProcessed) || isLastChunk) {
257284
if (infoStoreFlipped.get(lastContigProcessed) == null || !infoStoreFlipped.get(lastContigProcessed)) {
@@ -310,7 +337,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed
310337
}
311338
}
312339

313-
private static void saveVariantStore(VariantStore store,
340+
protected void saveVariantStore(VariantStore store,
314341
TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage)
315342
throws IOException, FileNotFoundException {
316343
store.setVariantMaskStorage(variantMaskStorage);
@@ -323,7 +350,7 @@ private static void saveVariantStore(VariantStore store,
323350
logger.debug("Done saving variant masks.");
324351
}
325352

326-
private static void saveInfoStores() throws IOException, FileNotFoundException {
353+
protected void saveInfoStores() throws IOException, FileNotFoundException {
327354
logger.debug("Saving info" + (System.currentTimeMillis() - startTime) + " seconds");
328355
try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "infoStores.javabin"));
329356
GZIPOutputStream gzos = new GZIPOutputStream(fos);
@@ -334,7 +361,7 @@ private static void saveInfoStores() throws IOException, FileNotFoundException {
334361
logger.info("completed load in " + (System.currentTimeMillis() - startTime) + " seconds");
335362
}
336363

337-
public static void splitInfoStoresByColumn() throws FileNotFoundException, IOException {
364+
public void splitInfoStoresByColumn() throws FileNotFoundException, IOException {
338365
logger.debug("Splitting" + (System.currentTimeMillis() - startTime) + " seconds");
339366
try {
340367
VCFPerPatientInfoStoreSplitter.splitAll(storageDir, new File(mergedDirStr));
@@ -344,7 +371,7 @@ public static void splitInfoStoresByColumn() throws FileNotFoundException, IOExc
344371
logger.debug("Split" + (System.currentTimeMillis() - startTime) + " seconds");
345372
}
346373

347-
public static void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException {
374+
public void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException {
348375
logger.debug("Converting" + (System.currentTimeMillis() - startTime) + " seconds");
349376
try {
350377
VCFPerPatientInfoStoreToFBBIISConverter.convertAll(mergedDirStr, storageDirStr);
@@ -354,7 +381,7 @@ public static void convertInfoStoresToByteIndexed() throws FileNotFoundException
354381
logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
355382
}
356383

357-
private static void shutdownChunkWriteExecutor() {
384+
protected void shutdownChunkWriteExecutor() {
358385
chunkWriteEx.shutdown();
359386
while (!chunkWriteEx.isTerminated()) {
360387
try {
@@ -377,16 +404,16 @@ private static ConcurrentHashMap<String, VariableVariantMasks> convertLoadingMap
377404

378405
static TreeMap<String, Boolean> infoStoreFlipped = new TreeMap<String, Boolean>();
379406

380-
private static class VCFWalker implements Comparable<VCFWalker> {
407+
protected class VCFWalker implements Comparable<VCFWalker> {
381408

382-
private List<Integer> indices;
383-
private Integer[] vcfOffsets;
384-
private Integer[] bitmaskOffsets;
385-
private HashMap<Integer, Integer> vcfIndexLookup;
386-
private String currentLine;
387-
private String[] currentLineSplit;
388-
private BufferedReader vcfReader;
389-
private VCFIndexLine vcfIndexLine;
409+
protected List<Integer> indices;
410+
protected Integer[] vcfOffsets;
411+
protected Integer[] bitmaskOffsets;
412+
protected HashMap<Integer, Integer> vcfIndexLookup;
413+
protected String currentLine;
414+
protected String[] currentLineSplit;
415+
protected BufferedReader vcfReader;
416+
protected VCFIndexLine vcfIndexLine;
390417
boolean hasNext = true;
391418
String currentContig;
392419
Integer currentPosition;
@@ -469,7 +496,7 @@ private void setMasksForSample(char[][] zygosityMaskStrings, int index, int star
469496
zygosityMaskStrings[patientZygosityIndex][bitmaskOffsets[index]] = '1';
470497
}
471498

472-
private String currentSpecNotation() {
499+
protected String currentSpecNotation() {
473500
String[] variantInfo = currentLineSplit[7].split("[=;]");
474501
String gene = "NULL";
475502
String consequence = "NULL";
@@ -616,7 +643,7 @@ public int compareTo(VCFWalker o) {
616643
private static final int SAMPLE_RELATIONSHIPS_COLUMN = 6;
617644
private static final int RELATED_SAMPLE_IDS_COLUMN = 7;
618645

619-
private static class VCFIndexLine implements Comparable<VCFIndexLine> {
646+
protected static class VCFIndexLine implements Comparable<VCFIndexLine> {
620647
String vcfPath;
621648
String contig;
622649
boolean isAnnotated;

0 commit comments

Comments
 (0)