Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALS-7737: Better support for splitting genomic data by chromosome #137

Merged
merged 26 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
337f7e9
Create VCFIndexBulider, add logging to NewVcfLoader
ramari16 Feb 14, 2025
e585887
ALS-7737: Refactor NewVCFLoader to better support running in parallel
ramari16 Feb 24, 2025
ea8846a
ALS-7737: Add VcfLoader that splits by chromosome. todo: variant meta…
ramari16 Feb 26, 2025
e6338be
ALS-7737: Add VariantMetadataLoader that loads per chromosome
ramari16 Feb 26, 2025
04e47bf
ALS-7737: Fix bugs discovered by splitting genomic data by chromosome
ramari16 Feb 27, 2025
0d6ecc5
Remove duplicate code
ramari16 Feb 27, 2025
cb17239
Refactor genomic config
ramari16 Feb 27, 2025
ac16836
ALS-7737: Cleanup hard to follow loops
ramari16 Feb 28, 2025
7cd7900
Update build references for new vcf loader
ramari16 Mar 3, 2025
da65d98
Fix typo
ramari16 Mar 4, 2025
73f4c63
Update genomic config for bch
ramari16 Mar 4, 2025
b91771f
toggle filesharing
Mar 4, 2025
6a0cbfd
Fix spring config
ramari16 Mar 4, 2025
21ef8ae
Fix tests
ramari16 Mar 4, 2025
f625ea5
Update variant metadata loader implementaton
ramari16 Mar 10, 2025
154002f
Attempt to fix variant explorer issue
ramari16 Mar 13, 2025
2ae548b
Attempt to fix variant explorer issue
ramari16 Mar 13, 2025
d4ad50e
Test coverage for variant explorer fix
ramari16 Mar 14, 2025
5988056
Test coverage for variant explorer fix
ramari16 Mar 14, 2025
4caf5e9
Changesfrom PR
ramari16 Mar 14, 2025
ea2b6aa
Merge from main
ramari16 Mar 14, 2025
83c3888
Fix string == bug that was actually working
ramari16 Mar 17, 2025
b056061
Fix integration tests
ramari16 Mar 18, 2025
a2d1e52
Add gic institute spring profile
ramari16 Mar 20, 2025
1c36a64
Add gic institute spring profile
ramari16 Mar 20, 2025
1afff4b
ALS-7737: Set gic site spring param
ramari16 Mar 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ public Set<String> filterVariantSetForPatientSet(Set<String> variantSet, Collect
String bucketKey = variantSpec.split(",")[0] + ":" + (Integer.parseInt(variantSpec.split(",")[1])/1000);

//testBit uses inverted indexes include +2 offset for bookends
return _bucketMask.testBit(maxIndex - Collections.binarySearch(bucketList, bucketKey) + 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug has existed since forever. testBit(-1) was sometimes returning not false because of overflows

int bucketKeyIndex = Collections.binarySearch(bucketList, bucketKey);
if (bucketKeyIndex < 0) {
return false;
}
return _bucketMask.testBit(maxIndex - bucketKeyIndex + 2);
}).collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class VariantMetadataIndex implements Serializable {
private final Map<String, FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, String[]>> > indexMap = new HashMap<>();

public static final String VARIANT_METADATA_STORAGE_FILE_PREFIX = "VariantMetadataStorage";
private static String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX;
private String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX;

/**
* This map allows us to load millions of variants without re-writing the fbbis each time (which would blow up the disk space).
Expand Down
1 change: 1 addition & 0 deletions docker/pic-sure-hpds-etl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoader-jar-with-dependencies.
COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoaderNewSearch-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/CSVDumper-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/VCFLocalLoader-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/SplitChromosomeVcfLoader-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/VariantMetadataLoader-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/UnifiedVCFLocalLoader-jar-with-dependencies.jar .
COPY --from=build /app/docker/pic-sure-hpds-etl/MultialleleCounter-jar-with-dependencies.jar .
Expand Down
22 changes: 21 additions & 1 deletion etl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@
<goal>single</goal>
</goals>
</execution>
<execution>
<id>buildSplitChromosomeVcfLoader</id>
<configuration>
<archive>
<manifest>
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader</mainClass>
</manifest>
</archive>
<outputDirectory>${project.basedir}/../docker/pic-sure-hpds-etl</outputDirectory>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<classifier>SplitChromosomeVcfLoader</classifier>
<finalName>SplitChromosomeVcfLoader</finalName>
</configuration>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
<execution>
<id>buildPerPatientUnifiedVCFLocalLoader</id>
<configuration>
Expand Down Expand Up @@ -275,7 +295,7 @@
<configuration>
<archive>
<manifest>
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader</mainClass>
<mainClass>edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVariantMetadataLoader</mainClass>
</manifest>
</archive>
<outputDirectory>${project.basedir}/../docker/pic-sure-hpds-etl</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util;
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader;

import java.io.FileNotFoundException;
import java.io.IOException;

import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed;

public class ConvertMergedInfoStoresToFBBIS {

public static void main(String[] args) throws FileNotFoundException, IOException {
public static void main(String[] args) throws IOException {

convertInfoStoresToByteIndexed();
new NewVCFLoader().convertInfoStoresToByteIndexed();

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util;

import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.splitInfoStoresByColumn;
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader;

import java.io.FileNotFoundException;
import java.io.IOException;

import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed;

public class ReSplitMergeInfoStores {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is used, will verify and delete


public static void main(String[] args) throws FileNotFoundException, IOException {
public static void main(String[] args) throws IOException {

NewVCFLoader newVCFLoader = new NewVCFLoader();

splitInfoStoresByColumn();
newVCFLoader.splitInfoStoresByColumn();

convertInfoStoresToByteIndexed();
newVCFLoader.convertInfoStoresToByteIndexed();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,77 @@

public class NewVCFLoader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes in this file are to support extending it and removing all the static nonsense


public static final String DEFAULT_VCF_INDEX_FILE = "/opt/local/hpds/vcfIndex.tsv";
public static final String DEFAULT_STORAGE_DIR = "/opt/local/hpds/all";
public static final String DEFAULT_MERGED_DIR = "/opt/local/hpds/merged";
private static Logger logger = LoggerFactory.getLogger(NewVCFLoader.class);
private static File storageDir = null;
private static String storageDirStr = "/opt/local/hpds/all";
private static String mergedDirStr = "/opt/local/hpds/merged";

private static VariantIndexBuilder variantIndexBuilder = new VariantIndexBuilder();
protected File indexFile;
protected File storageDir;
protected String storageDirStr;
protected String mergedDirStr;

protected VariantIndexBuilder variantIndexBuilder;

// DO NOT CHANGE THIS unless you want to reload all the data everywhere.
private static int CHUNK_SIZE = 1000;
protected static int CHUNK_SIZE = 1000;

/**
* @param args - if testing, this should be an array ['vcfIndexFile path', 'output storage dir', 'merged dir'].
* by default this will be [/opt/local/hpds/vcfIndex.tsv, "/opt/local/hpds/all", "/opt/local/hpds/merged" ].
*/
public static void main(String[] args) throws FileNotFoundException, IOException {
File indexFile;
public static void main(String[] args) throws IOException {

NewVCFLoader vcfLoader;
if(args != null && args.length >= 3) {
logger.info("Reading parameters from input - this is a test");
indexFile = new File(args[0]);
storageDirStr = args[1];
storageDir = new File(args[1]);
mergedDirStr = args[2];
logger.info("Reading parameters from input");
vcfLoader = new NewVCFLoader(new File(args[0]), args[1], args[2]);
} else {
indexFile = new File("/opt/local/hpds/vcfIndex.tsv");
storageDir = new File(storageDirStr);
logger.info(args.length + " arguments provided");
logger.info("Using default values");
vcfLoader = new NewVCFLoader();
}
loadVCFs(indexFile);
vcfLoader.loadAndMerge();
}

private static ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);
protected void loadAndMerge() throws IOException {
createWalkers();
loadVCFs();
}

private static ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();
public NewVCFLoader() {
this.indexFile = new File(DEFAULT_VCF_INDEX_FILE);
this.storageDirStr = DEFAULT_STORAGE_DIR;
this.storageDir = new File(DEFAULT_STORAGE_DIR);
this.mergedDirStr = DEFAULT_MERGED_DIR;
this.variantIndexBuilder = new VariantIndexBuilder();
}

private static HashMap<String, char[][]> zygosityMaskStrings;
public NewVCFLoader(File indexFile, String storageDir, String mergedDirStr) {
this.indexFile = indexFile;
this.storageDirStr = storageDir;
this.storageDir = new File(storageDir);
this.mergedDirStr = mergedDirStr;
this.variantIndexBuilder = new VariantIndexBuilder();
}

private static TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();
protected ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);

private static long startTime;
protected ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();

private static List<VCFWalker> walkers = new ArrayList<>();
protected HashMap<String, char[][]> zygosityMaskStrings;

private static boolean contigIsHemizygous;
protected TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();

private static void loadVCFs(File indexFile) throws IOException {
protected long startTime;

protected List<VCFWalker> walkers = new ArrayList<>();

private boolean contigIsHemizygous;

protected void loadVCFs() throws IOException {
startTime = System.currentTimeMillis();
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
for (VCFIndexLine line : vcfIndexLines) {
walkers.add(new VCFWalker(line));
}
TreeSet<Integer> allPatientIds = new TreeSet<Integer>();
TreeSet<Integer> allPatientIds = new TreeSet<>();

// Pull the INFO columns out of the headers for each walker and add all patient ids
walkers.stream().forEach(walker -> {
Expand Down Expand Up @@ -232,7 +252,14 @@ private static void loadVCFs(File indexFile) throws IOException {
saveVariantStore(store, variantMaskStorage);
}

private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
private void createWalkers() {
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
for (VCFIndexLine line : vcfIndexLines) {
walkers.add(new VCFWalker(line));
}
}

protected String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
StringBuilder idList = new StringBuilder();
if (variantMask != null) {
if (variantMask instanceof VariantMaskBitmaskImpl) {
Expand All @@ -251,7 +278,7 @@ private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMa
return idList.toString();
}

private static void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
protected void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
String currentContig, boolean isLastChunk, String currentLine) throws IOException, FileNotFoundException {
if (!currentContig.contentEquals(lastContigProcessed) || isLastChunk) {
if (infoStoreFlipped.get(lastContigProcessed) == null || !infoStoreFlipped.get(lastContigProcessed)) {
Expand Down Expand Up @@ -310,7 +337,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed
}
}

private static void saveVariantStore(VariantStore store,
protected void saveVariantStore(VariantStore store,
TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage)
throws IOException, FileNotFoundException {
store.setVariantMaskStorage(variantMaskStorage);
Expand All @@ -323,7 +350,7 @@ private static void saveVariantStore(VariantStore store,
logger.debug("Done saving variant masks.");
}

private static void saveInfoStores() throws IOException, FileNotFoundException {
protected void saveInfoStores() throws IOException, FileNotFoundException {
logger.debug("Saving info" + (System.currentTimeMillis() - startTime) + " seconds");
try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "infoStores.javabin"));
GZIPOutputStream gzos = new GZIPOutputStream(fos);
Expand All @@ -334,7 +361,7 @@ private static void saveInfoStores() throws IOException, FileNotFoundException {
logger.info("completed load in " + (System.currentTimeMillis() - startTime) + " seconds");
}

public static void splitInfoStoresByColumn() throws FileNotFoundException, IOException {
public void splitInfoStoresByColumn() throws FileNotFoundException, IOException {
logger.debug("Splitting" + (System.currentTimeMillis() - startTime) + " seconds");
try {
VCFPerPatientInfoStoreSplitter.splitAll(storageDir, new File(mergedDirStr));
Expand All @@ -344,7 +371,7 @@ public static void splitInfoStoresByColumn() throws FileNotFoundException, IOExc
logger.debug("Split" + (System.currentTimeMillis() - startTime) + " seconds");
}

public static void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException {
public void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException {
logger.debug("Converting" + (System.currentTimeMillis() - startTime) + " seconds");
try {
VCFPerPatientInfoStoreToFBBIISConverter.convertAll(mergedDirStr, storageDirStr);
Expand All @@ -354,7 +381,7 @@ public static void convertInfoStoresToByteIndexed() throws FileNotFoundException
logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}

private static void shutdownChunkWriteExecutor() {
protected void shutdownChunkWriteExecutor() {
chunkWriteEx.shutdown();
while (!chunkWriteEx.isTerminated()) {
try {
Expand All @@ -377,16 +404,16 @@ private static ConcurrentHashMap<String, VariableVariantMasks> convertLoadingMap

static TreeMap<String, Boolean> infoStoreFlipped = new TreeMap<String, Boolean>();

private static class VCFWalker implements Comparable<VCFWalker> {
protected class VCFWalker implements Comparable<VCFWalker> {

private List<Integer> indices;
private Integer[] vcfOffsets;
private Integer[] bitmaskOffsets;
private HashMap<Integer, Integer> vcfIndexLookup;
private String currentLine;
private String[] currentLineSplit;
private BufferedReader vcfReader;
private VCFIndexLine vcfIndexLine;
protected List<Integer> indices;
protected Integer[] vcfOffsets;
protected Integer[] bitmaskOffsets;
protected HashMap<Integer, Integer> vcfIndexLookup;
protected String currentLine;
protected String[] currentLineSplit;
protected BufferedReader vcfReader;
protected VCFIndexLine vcfIndexLine;
boolean hasNext = true;
String currentContig;
Integer currentPosition;
Expand Down Expand Up @@ -469,7 +496,7 @@ private void setMasksForSample(char[][] zygosityMaskStrings, int index, int star
zygosityMaskStrings[patientZygosityIndex][bitmaskOffsets[index]] = '1';
}

private String currentSpecNotation() {
protected String currentSpecNotation() {
String[] variantInfo = currentLineSplit[7].split("[=;]");
String gene = "NULL";
String consequence = "NULL";
Expand Down Expand Up @@ -616,7 +643,7 @@ public int compareTo(VCFWalker o) {
private static final int SAMPLE_RELATIONSHIPS_COLUMN = 6;
private static final int RELATED_SAMPLE_IDS_COLUMN = 7;

private static class VCFIndexLine implements Comparable<VCFIndexLine> {
protected static class VCFIndexLine implements Comparable<VCFIndexLine> {
String vcfPath;
String contig;
boolean isAnnotated;
Expand Down
Loading