Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALS-7737: Better support for splitting genomic data by chromosome #137

Merged
merged 26 commits into from
Mar 21, 2025
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
337f7e9
Create VCFIndexBulider, add logging to NewVcfLoader
ramari16 Feb 14, 2025
e585887
ALS-7737: Refactor NewVCFLoader to better support running in parallel
ramari16 Feb 24, 2025
ea8846a
ALS-7737: Add VcfLoader that splits by chromosome. todo: variant meta…
ramari16 Feb 26, 2025
e6338be
ALS-7737: Add VariantMetadataLoader that loads per chromosome
ramari16 Feb 26, 2025
04e47bf
ALS-7737: Fix bugs discovered by splitting genomic data by chromosome
ramari16 Feb 27, 2025
0d6ecc5
Remove duplicate code
ramari16 Feb 27, 2025
cb17239
Refactor genomic config
ramari16 Feb 27, 2025
ac16836
ALS-7737: Cleanup hard to follow loops
ramari16 Feb 28, 2025
7cd7900
Update build references for new vcf loader
ramari16 Mar 3, 2025
da65d98
Fix typo
ramari16 Mar 4, 2025
73f4c63
Update genomic config for bch
ramari16 Mar 4, 2025
b91771f
toggle filesharing
Mar 4, 2025
6a0cbfd
Fix spring config
ramari16 Mar 4, 2025
21ef8ae
Fix tests
ramari16 Mar 4, 2025
f625ea5
Update variant metadata loader implementaton
ramari16 Mar 10, 2025
154002f
Attempt to fix variant explorer issue
ramari16 Mar 13, 2025
2ae548b
Attempt to fix variant explorer issue
ramari16 Mar 13, 2025
d4ad50e
Test coverage for variant explorer fix
ramari16 Mar 14, 2025
5988056
Test coverage for variant explorer fix
ramari16 Mar 14, 2025
4caf5e9
Changesfrom PR
ramari16 Mar 14, 2025
ea2b6aa
Merge from main
ramari16 Mar 14, 2025
83c3888
Fix string == bug that was actually working
ramari16 Mar 17, 2025
b056061
Fix integration tests
ramari16 Mar 18, 2025
a2d1e52
Add gic institute spring profile
ramari16 Mar 20, 2025
1c36a64
Add gic institute spring profile
ramari16 Mar 20, 2025
1afff4b
ALS-7737: Set gic site spring param
ramari16 Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ALS-7737: Add VcfLoader that splits by chromosome. todo: variant meta…
…data in a similar fashion
ramari16 committed Feb 26, 2025
commit ea8846ac88af6bc51c62e77feada37184d9203c9
Original file line number Diff line number Diff line change
@@ -27,15 +27,15 @@ public class NewVCFLoader {
public static final String DEFAULT_MERGED_DIR = "/opt/local/hpds/merged";
private static Logger logger = LoggerFactory.getLogger(NewVCFLoader.class);

private File indexFile;
private File storageDir;
private String storageDirStr;
private String mergedDirStr;
protected File indexFile;
protected File storageDir;
protected String storageDirStr;
protected String mergedDirStr;

private VariantIndexBuilder variantIndexBuilder;
protected VariantIndexBuilder variantIndexBuilder;

// DO NOT CHANGE THIS unless you want to reload all the data everywhere.
private static int CHUNK_SIZE = 1000;
protected static int CHUNK_SIZE = 1000;

/**
* @param args - if testing, this should be an array ['vcfIndexFile path', 'output storage dir', 'merged dir'].
@@ -52,8 +52,12 @@ public static void main(String[] args) throws IOException {
logger.info("Using default values");
vcfLoader = new NewVCFLoader();
}
vcfLoader.loadAndMerge();
}

vcfLoader.loadVCFs();
protected void loadAndMerge() throws IOException {
createWalkers();
loadVCFs();
}

public NewVCFLoader() {
@@ -72,27 +76,23 @@ public NewVCFLoader(File indexFile, String storageDir, String mergedDirStr) {
this.variantIndexBuilder = new VariantIndexBuilder();
}

private ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);
protected ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1);

private ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();
protected ConcurrentHashMap<String, InfoStore> infoStoreMap = new ConcurrentHashMap<String, InfoStore>();

private HashMap<String, char[][]> zygosityMaskStrings;
protected HashMap<String, char[][]> zygosityMaskStrings;

private TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();
protected TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage = new TreeMap<>();

private long startTime;
protected long startTime;

private List<VCFWalker> walkers = new ArrayList<>();
protected List<VCFWalker> walkers = new ArrayList<>();

private boolean contigIsHemizygous;

private void loadVCFs() throws IOException {
protected void loadVCFs() throws IOException {
startTime = System.currentTimeMillis();
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
for (VCFIndexLine line : vcfIndexLines) {
walkers.add(new VCFWalker(line));
}
TreeSet<Integer> allPatientIds = new TreeSet<Integer>();
TreeSet<Integer> allPatientIds = new TreeSet<>();

// Pull the INFO columns out of the headers for each walker and add all patient ids
walkers.stream().forEach(walker -> {
@@ -252,7 +252,14 @@ private void loadVCFs() throws IOException {
saveVariantStore(store, variantMaskStorage);
}

private String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
private void createWalkers() {
List<VCFIndexLine> vcfIndexLines = parseVCFIndex(indexFile);
for (VCFIndexLine line : vcfIndexLines) {
walkers.add(new VCFWalker(line));
}
}

protected String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
StringBuilder idList = new StringBuilder();
if (variantMask != null) {
if (variantMask instanceof VariantMaskBitmaskImpl) {
@@ -271,7 +278,7 @@ private String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) {
return idList.toString();
}

private void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
protected void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk,
String currentContig, boolean isLastChunk, String currentLine) throws IOException, FileNotFoundException {
if (!currentContig.contentEquals(lastContigProcessed) || isLastChunk) {
if (infoStoreFlipped.get(lastContigProcessed) == null || !infoStoreFlipped.get(lastContigProcessed)) {
@@ -330,7 +337,7 @@ private void flipChunk(String lastContigProcessed, int lastChunkProcessed, int c
}
}

private void saveVariantStore(VariantStore store,
protected void saveVariantStore(VariantStore store,
TreeMap<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>>> variantMaskStorage)
throws IOException, FileNotFoundException {
store.setVariantMaskStorage(variantMaskStorage);
@@ -343,7 +350,7 @@ private void saveVariantStore(VariantStore store,
logger.debug("Done saving variant masks.");
}

private void saveInfoStores() throws IOException, FileNotFoundException {
protected void saveInfoStores() throws IOException, FileNotFoundException {
logger.debug("Saving info" + (System.currentTimeMillis() - startTime) + " seconds");
try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "infoStores.javabin"));
GZIPOutputStream gzos = new GZIPOutputStream(fos);
@@ -374,7 +381,7 @@ public void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOExc
logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}

private void shutdownChunkWriteExecutor() {
protected void shutdownChunkWriteExecutor() {
chunkWriteEx.shutdown();
while (!chunkWriteEx.isTerminated()) {
try {
@@ -397,16 +404,16 @@ private static ConcurrentHashMap<String, VariableVariantMasks> convertLoadingMap

static TreeMap<String, Boolean> infoStoreFlipped = new TreeMap<String, Boolean>();

private class VCFWalker implements Comparable<VCFWalker> {
protected class VCFWalker implements Comparable<VCFWalker> {

private List<Integer> indices;
private Integer[] vcfOffsets;
private Integer[] bitmaskOffsets;
private HashMap<Integer, Integer> vcfIndexLookup;
private String currentLine;
private String[] currentLineSplit;
private BufferedReader vcfReader;
private VCFIndexLine vcfIndexLine;
protected List<Integer> indices;
protected Integer[] vcfOffsets;
protected Integer[] bitmaskOffsets;
protected HashMap<Integer, Integer> vcfIndexLookup;
protected String currentLine;
protected String[] currentLineSplit;
protected BufferedReader vcfReader;
protected VCFIndexLine vcfIndexLine;
boolean hasNext = true;
String currentContig;
Integer currentPosition;
@@ -489,7 +496,7 @@ private void setMasksForSample(char[][] zygosityMaskStrings, int index, int star
zygosityMaskStrings[patientZygosityIndex][bitmaskOffsets[index]] = '1';
}

private String currentSpecNotation() {
protected String currentSpecNotation() {
String[] variantInfo = currentLineSplit[7].split("[=;]");
String gene = "NULL";
String consequence = "NULL";
@@ -636,7 +643,7 @@ public int compareTo(VCFWalker o) {
private static final int SAMPLE_RELATIONSHIPS_COLUMN = 6;
private static final int RELATED_SAMPLE_IDS_COLUMN = 7;

private static class VCFIndexLine implements Comparable<VCFIndexLine> {
protected static class VCFIndexLine implements Comparable<VCFIndexLine> {
String vcfPath;
String contig;
boolean isAnnotated;
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;

import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariableVariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMask;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class SplitChromosomeVcfLoader extends NewVCFLoader {

private static Logger logger = LoggerFactory.getLogger(SplitChromosomeVcfLoader.class);
private String[] allSampleIds;
private Integer[] patientIds;
private TreeSet<Integer> allPatientIds;

private final String baseStorageDir;
private final String baseMergeDir;

public SplitChromosomeVcfLoader(File file, String baseStorageDir, String baseMergeDir) {
super(file, baseStorageDir, baseMergeDir);
this.baseStorageDir = baseStorageDir;
this.baseMergeDir = baseMergeDir;
}

public SplitChromosomeVcfLoader() {
super();
this.baseStorageDir = DEFAULT_STORAGE_DIR;
this.baseMergeDir = DEFAULT_MERGED_DIR;
}



public static void main(String[] args) throws IOException {
NewVCFLoader vcfLoader;
if(args != null && args.length >= 3) {
logger.info("Reading parameters from input");
vcfLoader = new SplitChromosomeVcfLoader(new File(args[0]), args[1], args[2]);
} else {
logger.info(args.length + " arguments provided");
logger.info("Using default values");
vcfLoader = new NewVCFLoader();
}
vcfLoader.loadAndMerge();

vcfLoader.shutdownChunkWriteExecutor();
}


protected void loadVCFs() throws IOException {
startTime = System.currentTimeMillis();
allPatientIds = new TreeSet<>();

// Pull the INFO columns out of the headers for each walker and add all patient ids
walkers.stream().forEach(walker -> {
try {
logger.info("Reading headers of VCF [" + walker.vcfIndexLine.vcfPath + "]");
walker.readHeaders(infoStoreMap);
allPatientIds.addAll(Arrays.asList(walker.vcfIndexLine.patientIds));
} catch (IOException e) {
logger.error("Error while reading headers of VCF [" + walker.vcfIndexLine.vcfPath + "]", e);
System.exit(-1);
}
});

patientIds = allPatientIds.toArray(new Integer[0]);
allSampleIds = new String[allPatientIds.size()];

walkers.parallelStream().forEach(walker -> {
logger.info("Setting bitmask offsets for VCF [" + walker.vcfIndexLine.vcfPath + "]");
walker.setBitmaskOffsets(patientIds);
for (int x = 0; x < walker.vcfIndexLine.sampleIds.length; x++) {
allSampleIds[Arrays.binarySearch(patientIds,
walker.vcfIndexLine.patientIds[x])] = walker.vcfIndexLine.sampleIds[x];
}
});

for (VCFWalker walker : walkers) {
chunkWriteEx = Executors.newFixedThreadPool(1);
storageDirStr = baseStorageDir + "/" + walker.currentContig;
storageDir = new File(storageDirStr);
storageDir.mkdirs();
mergedDirStr = baseMergeDir + "/" + walker.currentContig;
new File(mergedDirStr).mkdirs();
variantIndexBuilder = new VariantIndexBuilder();
variantMaskStorage = new TreeMap<>();
walkers = new ArrayList<>();
walkers.add(walker);
loadSingleContig();
}
}

private void loadSingleContig() throws IOException {
VariantStore store = new VariantStore();
store.setPatientIds(allPatientIds.stream().map((id) -> {
return id.toString();
}).collect(Collectors.toList()).toArray(new String[0]));

String lastContigProcessed = null;
int lastChunkProcessed = 0;
int currentChunk = 0;
String[] currentContig = new String[1];
int[] currentPosition = { -1 };
String[] currentRef = new String[1];
String[] currentAlt = new String[1];

zygosityMaskStrings = new HashMap<String/* variantSpec */, char[][]/* string bitmasks */>();

List<Integer> positionsProcessedInChunk = new ArrayList<>();
while (walkers.stream().anyMatch(walker -> {
return walker.hasNext;
})) {
Collections.sort(walkers);
VCFWalker lowestWalker = walkers.get(0);
String currentSpecNotation = lowestWalker.currentSpecNotation();
currentContig[0] = lowestWalker.currentContig;
currentPosition[0] = lowestWalker.currentPosition;
currentRef[0] = lowestWalker.currentRef;
currentAlt[0] = lowestWalker.currentAlt;
currentChunk = lowestWalker.currentPosition / CHUNK_SIZE;
positionsProcessedInChunk.add(currentPosition[0]);

if (lastContigProcessed == null) {
lastContigProcessed = lowestWalker.currentContig;
}

flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], false,
lowestWalker.currentLine);
lastContigProcessed = lowestWalker.currentContig;
lastChunkProcessed = currentChunk;

char[][][] maskStringsForVariantSpec = { zygosityMaskStrings.get(currentSpecNotation) };
if (maskStringsForVariantSpec[0] == null) {
maskStringsForVariantSpec[0] = new char[7][allPatientIds.size()];
for (int x = 0; x < maskStringsForVariantSpec[0].length; x++) {
maskStringsForVariantSpec[0][x] = new char[allPatientIds.size()];
for (int y = 0; y < allPatientIds.size(); y++) {
maskStringsForVariantSpec[0][x][y] = '0';
}
}
}
walkers.stream().filter((walker) -> {
return walker.currentPosition == currentPosition[0] && walker.currentAlt == currentAlt[0]
&& walker.currentRef == currentRef[0] && walker.currentContig.contentEquals(currentContig[0]);
}).forEach((walker) -> {
walker.updateRecords(maskStringsForVariantSpec[0], infoStoreMap);
try {
walker.nextLine();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]);
walkers = walkers.stream().filter((walker) -> {
return walker.hasNext;
}).collect(Collectors.toList());
}

flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], true, null);

shutdownChunkWriteExecutor();

saveInfoStores();

splitInfoStoresByColumn();

convertInfoStoresToByteIndexed();

if (logger.isDebugEnabled()) {
// Log out the first and last 50 variants
int[] count = { 0 };
for (String contig : store.getVariantMaskStorage().keySet()) {
ArrayList<Integer> chunkIds = new ArrayList<>();
FileBackedByteIndexedStorage<Integer, ConcurrentHashMap<String, VariableVariantMasks>> chromosomeStorage = store.getVariantMaskStorage()
.get(contig);
if (chromosomeStorage != null) {
// print out the top and bottom 50 variants in the store (that have masks)
chunkIds.addAll(chromosomeStorage.keys());
for (Integer chunkId : chunkIds) {
for (String variantSpec : chromosomeStorage.get(chunkId).keySet()) {
count[0]++;
VariableVariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
if (variantMasks != null) {
VariantMask heterozygousMask = variantMasks.heterozygousMask;
String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
VariantMask homozygousMask = variantMasks.homozygousMask;
String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);

if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
logger.debug(variantSpec + " : homozygous : " + homoIdList);
}
}
if (count[0] > 50)
break;
}

count[0] = 0;
for (int x = chunkIds.size() - 1; x > 0; x--) {
int chunkId = chunkIds.get(x);
chromosomeStorage.get(chunkId).keySet().forEach((variantSpec) -> {
count[0]++;
VariableVariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec);
if (variantMasks != null) {
VariantMask heterozygousMask = variantMasks.heterozygousMask;
String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask);
VariantMask homozygousMask = variantMasks.homozygousMask;
String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask);

if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000)
logger.debug(variantSpec + " : heterozygous : " + heteroIdList);
if (!homoIdList.isEmpty() && homoIdList.length() < 1000)
logger.debug(variantSpec + " : homozygous : " + homoIdList);
}
});
if (count[0] > 50)
break;
}
}
}
}

store.setVariantSpecIndex(variantIndexBuilder.getVariantSpecIndex().toArray(new String[0]));
saveVariantStore(store, variantMaskStorage);
}
}
Original file line number Diff line number Diff line change
@@ -34,11 +34,19 @@ public GenomicProcessor localGenomicProcessor() {
@Bean(name = "localDistributedGenomicProcessor")
@ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "localDistributed")
public GenomicProcessor localDistributedGenomicProcessor() {
List<GenomicProcessor> genomicProcessors = Flux.range(1, 22)
.flatMap(i -> Mono.fromCallable(() -> (GenomicProcessor) new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/" + i + "/")).subscribeOn(Schedulers.boundedElastic()))
// assumed for now that all first level directories contain a genomic dataset for a group of studies
File[] directories = new File(hpdsGenomicDataDirectory).listFiles(File::isDirectory);
if (directories.length > 50) {
throw new IllegalArgumentException("Number of chromosome partitions exceeds maximum of 50 (" + directories.length + ")");
}

List<GenomicProcessor> genomicProcessors = Flux.fromArray(directories)
.flatMap(i -> Mono.fromCallable(() -> {
return (GenomicProcessor) new GenomicProcessorNodeImpl(i.getAbsolutePath() + "/");
}).subscribeOn(Schedulers.boundedElastic()))
.collectList()
.block();
genomicProcessors.add(new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/X/"));
//genomicProcessors.add(new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/X/"));
return new GenomicProcessorParentImpl(genomicProcessors);
}

Original file line number Diff line number Diff line change
@@ -4,6 +4,6 @@ LARGE_TASK_THREADS = 1
ID_BATCH_SIZE=1000
VCF_EXCERPT_ENABLED=true

hpds.genomicProcessor.impl=local
HPDS_GENOMIC_DATA_DIRECTORY=target/
hpds.genomicProcessor.impl=localDistributed
HPDS_GENOMIC_DATA_DIRECTORY=target/all/
HPDS_DATA_DIRECTORY=target/test-classes/phenotypic/
Original file line number Diff line number Diff line change
@@ -3,29 +3,41 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.BucketIndexBySample;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader;
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader;
import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader;
import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;

public enum BuildIntegrationTestEnvironment {
INSTANCE;

private static final String PHENOTYPIC_DATA_DIRECTORY = "target/test-classes/phenotypic/";
private final String VCF_INDEX_FILE = "./src/test/resources/test_vcfIndex.tsv";
private final String STORAGE_DIR = "./target/";
private final String STORAGE_DIR = "./target/all/";
private final String MERGED_DIR = "./target/merged/";

public String binFile = "target/VariantMetadata.javabin";

BuildIntegrationTestEnvironment() {
try {
NewVCFLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR});
SplitChromosomeVcfLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR});
CSVLoader.main(new String[] {PHENOTYPIC_DATA_DIRECTORY});
VariantMetadataLoader.main(new String[] {"./src/test/resources/test_vcfIndex.tsv", binFile, "target/VariantMetadataStorage.bin"});
VariantStore variantStore = VariantStore.readInstance(STORAGE_DIR);
BucketIndexBySample bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR);
} catch (IOException | ClassNotFoundException e) {
List.of("chr4", "chr20", "chr21").forEach(dir -> {
VariantStore variantStore = null;
try {
variantStore = VariantStore.readInstance(STORAGE_DIR + dir + "/");
BucketIndexBySample bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR + dir + "/");
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}