From 337f7e9e6cedbeda6f997c1abac82d379ecfc944 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 14 Feb 2025 09:48:32 -0500 Subject: [PATCH 01/25] Create VCFIndexBulider, add logging to NewVcfLoader --- .../hpds/etl/genotype/NewVCFLoader.java | 2 + .../hpds/etl/genotype/VCFIndexBuilder.java | 128 ++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java index 1aed473e..66331f1c 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java @@ -46,6 +46,8 @@ public static void main(String[] args) throws FileNotFoundException, IOException storageDir = new File(args[1]); mergedDirStr = args[2]; } else { + logger.info(args.length + " arguments provided"); + logger.info("Using default values"); indexFile = new File("/opt/local/hpds/vcfIndex.tsv"); storageDir = new File(storageDirStr); } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java new file mode 100644 index 00000000..9dbda27a --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java @@ -0,0 +1,128 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import com.google.common.base.Joiner; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.*; +import java.util.stream.Collectors; + +public class VCFIndexBuilder { + + private final Set validPatientTypes; + + + private final File vcfPatientMappingFile; + private final File patientUUIDToIdMappingFile; + private final File vcfIndexOutputDirectory; + private Map> fileToPatientListMap; + private Map patientUUIDToPatientIdMapping; + + private static final String VCF_INDEX_DIRECTORY = "/opt/local/hpds"; + + private static final Joiner COMMA_JOINER = Joiner.on(","); + + public VCFIndexBuilder(File vcfPatientMappingFile, File patientUUIDToIdMappingFile, File vcfIndexOutputDirectory, Set validPatientType) { + this.vcfPatientMappingFile = vcfPatientMappingFile; + this.patientUUIDToIdMappingFile = patientUUIDToIdMappingFile; + this.vcfIndexOutputDirectory = vcfIndexOutputDirectory; + this.validPatientTypes = validPatientType; + } + + public static void main(String[] args) { + if (args.length != 4) { + throw new IllegalArgumentException("There must be 4 parameters to VCFIndexBuilder"); + } + File vcfIndexOutputDirectoryFile = new File(args[2]); + if (!vcfIndexOutputDirectoryFile.isDirectory()) { + throw new IllegalArgumentException("Argument 3 must be a valid directory"); + } + new VCFIndexBuilder( + new File(args[0]), + new File(args[1]), + vcfIndexOutputDirectoryFile, + Set.of(args[3].split(",")) + ).run(); + } + + private void run() { + + patientUUIDToPatientIdMapping = new HashMap<>(); + fileToPatientListMap = new HashMap<>(); + try { + Files.lines(patientUUIDToIdMappingFile.toPath()) + .skip(1) + .map(l -> l.split(",")) + .filter(columns -> columns.length == 2) + .forEach(columns -> { + patientUUIDToPatientIdMapping.put(columns[0], columns[1]); + }); + + Files.lines(vcfPatientMappingFile.toPath()) + .skip(1) + .map(l -> l.split(",")) + .filter(columns -> validPatientTypes.contains(columns[4])) + .forEach(columns -> { + String patientUuid = columns[0]; + String vcfFile = columns[1].substring(columns[1].lastIndexOf("/") + 1); + List patientList = Optional.ofNullable(fileToPatientListMap.get(vcfFile)).orElseGet(ArrayList::new); + patientList.add(patientUuid); + + if (patientUUIDToPatientIdMapping.get(patientUuid) != null) { + fileToPatientListMap.put(vcfFile, patientList); + } + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + writeVcfIndexes(); + } + + private void writeVcfIndexes() { + Map> groupToVcfMapping = new HashMap<>(); + + for (String fileName : fileToPatientListMap.keySet()) { + String chromosome = extractChromosome(fileName); + String baseFile = fileName.substring(0, fileName.indexOf(".chr")); + List vcfFiles = groupToVcfMapping.getOrDefault(baseFile, new ArrayList<>()); + vcfFiles.add(fileName); + groupToVcfMapping.put(baseFile, vcfFiles); + } + + groupToVcfMapping.keySet() + .stream() + .forEach(vcfGroup -> { + writeVcfIndex(vcfGroup, groupToVcfMapping.get(vcfGroup)); + }); + System.out.println(groupToVcfMapping.size()); + + + } + + private void writeVcfIndex(String vcfGroup, List vcfFiles) { + try { + FileWriter fileWriter = new FileWriter(vcfIndexOutputDirectory.getAbsolutePath() + "/" + vcfGroup + "-vcfIndex.tsv"); + fileWriter.write("\"vcf_path\"\t\"chromosome\"\t\"isAnnotated\"\t\"isGzipped\"\t\"sample_ids\"\t\"patient_ids\"\t\"sample_relationship\"\t\"related_sample_ids\"\n"); + String vcfFile = vcfFiles.get(0); + Set validPatientUUIDs = fileToPatientListMap.get(vcfFile) + .stream() + .filter(patientUUIDToPatientIdMapping::containsKey) + .collect(Collectors.toSet()); + List patentIds = validPatientUUIDs.stream().map(patientUUIDToPatientIdMapping::get).filter(Objects::nonNull).toList(); + fileWriter.write("\"" + VCF_INDEX_DIRECTORY + "/" + vcfFile + "\"\t\"" + extractChromosome(vcfFile) + "\"\t\"1\"\t\"1\"\t"); + fileWriter.write("\"" + COMMA_JOINER.join(validPatientUUIDs) + "\"\t\"" + COMMA_JOINER.join(patentIds) + "\""); + fileWriter.flush(); + fileWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static String extractChromosome(String fileName) { + return fileName.substring(fileName.indexOf(".chr") + 4, fileName.lastIndexOf(".vcf.gz")); + } +} From e58588792aff156824e8dcf1165dec83310eba61 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 24 Feb 2025 09:58:44 -0500 Subject: [PATCH 02/25] ALS-7737: Refactor NewVCFLoader to better support running in parallel --- .../util/ConvertMergedInfoStoresToFBBIS.java | 7 +- .../genotype/util/ReSplitMergeInfoStores.java | 12 +-- .../hpds/etl/genotype/NewVCFLoader.java | 80 ++++++++++++------- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ConvertMergedInfoStoresToFBBIS.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ConvertMergedInfoStoresToFBBIS.java index 534d1019..6ef9e736 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ConvertMergedInfoStoresToFBBIS.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ConvertMergedInfoStoresToFBBIS.java @@ -1,14 +1,15 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util; +import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; + import java.io.FileNotFoundException; import java.io.IOException; -import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed; public class ConvertMergedInfoStoresToFBBIS { - public static void main(String[] args) throws FileNotFoundException, IOException { + public static void main(String[] args) throws IOException { - convertInfoStoresToByteIndexed(); + new NewVCFLoader().convertInfoStoresToByteIndexed(); } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ReSplitMergeInfoStores.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ReSplitMergeInfoStores.java index ec5dc943..f49035ae 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ReSplitMergeInfoStores.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/util/ReSplitMergeInfoStores.java @@ -1,19 +1,19 @@ package edu.harvard.hms.dbmi.avillach.hpds.data.genotype.util; -import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.splitInfoStoresByColumn; +import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; -import java.io.FileNotFoundException; import java.io.IOException; -import static edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader.convertInfoStoresToByteIndexed; public class ReSplitMergeInfoStores { - public static void main(String[] args) throws FileNotFoundException, IOException { + public static void main(String[] args) throws IOException { + + NewVCFLoader newVCFLoader = new NewVCFLoader(); - splitInfoStoresByColumn(); + newVCFLoader.splitInfoStoresByColumn(); - convertInfoStoresToByteIndexed(); + newVCFLoader.convertInfoStoresToByteIndexed(); } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java index 66331f1c..7b2a9946 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java @@ -22,12 +22,17 @@ public class NewVCFLoader { + public static final String DEFAULT_VCF_INDEX_FILE = "/opt/local/hpds/vcfIndex.tsv"; + public static final String DEFAULT_STORAGE_DIR = "/opt/local/hpds/all"; + public static final String DEFAULT_MERGED_DIR = "/opt/local/hpds/merged"; private static Logger logger = LoggerFactory.getLogger(NewVCFLoader.class); - private static File storageDir = null; - private static String storageDirStr = "/opt/local/hpds/all"; - private static String mergedDirStr = "/opt/local/hpds/merged"; - private static VariantIndexBuilder variantIndexBuilder = new VariantIndexBuilder(); + private File indexFile; + private File storageDir; + private String storageDirStr; + private String mergedDirStr; + + private VariantIndexBuilder variantIndexBuilder; // DO NOT CHANGE THIS unless you want to reload all the data everywhere. private static int CHUNK_SIZE = 1000; @@ -36,39 +41,52 @@ public class NewVCFLoader { * @param args - if testing, this should be an array ['vcfIndexFile path', 'output storage dir', 'merged dir']. * by default this will be [/opt/local/hpds/vcfIndex.tsv, "/opt/local/hpds/all", "/opt/local/hpds/merged" ]. */ - public static void main(String[] args) throws FileNotFoundException, IOException { - - File indexFile; + public static void main(String[] args) throws IOException { + + NewVCFLoader vcfLoader; if(args != null && args.length >= 3) { - logger.info("Reading parameters from input - this is a test"); - indexFile = new File(args[0]); - storageDirStr = args[1]; - storageDir = new File(args[1]); - mergedDirStr = args[2]; + logger.info("Reading parameters from input"); + vcfLoader = new NewVCFLoader(new File(args[0]), args[1], args[2]); } else { logger.info(args.length + " arguments provided"); logger.info("Using default values"); - indexFile = new File("/opt/local/hpds/vcfIndex.tsv"); - storageDir = new File(storageDirStr); + vcfLoader = new NewVCFLoader(); } - loadVCFs(indexFile); + + vcfLoader.loadVCFs(); + } + + public NewVCFLoader() { + this.indexFile = new File(DEFAULT_VCF_INDEX_FILE); + this.storageDirStr = DEFAULT_STORAGE_DIR; + this.storageDir = new File(DEFAULT_STORAGE_DIR); + this.mergedDirStr = DEFAULT_MERGED_DIR; + this.variantIndexBuilder = new VariantIndexBuilder(); + } + + public NewVCFLoader(File indexFile, String storageDir, String mergedDirStr) { + this.indexFile = indexFile; + this.storageDirStr = storageDir; + this.storageDir = new File(storageDir); + this.mergedDirStr = mergedDirStr; + this.variantIndexBuilder = new VariantIndexBuilder(); } - private static ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1); + private ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1); - private static ConcurrentHashMap infoStoreMap = new ConcurrentHashMap(); + private ConcurrentHashMap infoStoreMap = new ConcurrentHashMap(); - private static HashMap zygosityMaskStrings; + private HashMap zygosityMaskStrings; - private static TreeMap>> variantMaskStorage = new TreeMap<>(); + private TreeMap>> variantMaskStorage = new TreeMap<>(); - private static long startTime; + private long startTime; - private static List walkers = new ArrayList<>(); + private List walkers = new ArrayList<>(); - private static boolean contigIsHemizygous; + private boolean contigIsHemizygous; - private static void loadVCFs(File indexFile) throws IOException { + private void loadVCFs() throws IOException { startTime = System.currentTimeMillis(); List vcfIndexLines = parseVCFIndex(indexFile); for (VCFIndexLine line : vcfIndexLines) { @@ -234,7 +252,7 @@ private static void loadVCFs(File indexFile) throws IOException { saveVariantStore(store, variantMaskStorage); } - private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) { + private String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) { StringBuilder idList = new StringBuilder(); if (variantMask != null) { if (variantMask instanceof VariantMaskBitmaskImpl) { @@ -253,7 +271,7 @@ private static String sampleIdsForMask(String[] sampleIds, VariantMask variantMa return idList.toString(); } - private static void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk, + private void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk, String currentContig, boolean isLastChunk, String currentLine) throws IOException, FileNotFoundException { if (!currentContig.contentEquals(lastContigProcessed) || isLastChunk) { if (infoStoreFlipped.get(lastContigProcessed) == null || !infoStoreFlipped.get(lastContigProcessed)) { @@ -312,7 +330,7 @@ private static void flipChunk(String lastContigProcessed, int lastChunkProcessed } } - private static void saveVariantStore(VariantStore store, + private void saveVariantStore(VariantStore store, TreeMap>> variantMaskStorage) throws IOException, FileNotFoundException { store.setVariantMaskStorage(variantMaskStorage); @@ -325,7 +343,7 @@ private static void saveVariantStore(VariantStore store, logger.debug("Done saving variant masks."); } - private static void saveInfoStores() throws IOException, FileNotFoundException { + private void saveInfoStores() throws IOException, FileNotFoundException { logger.debug("Saving info" + (System.currentTimeMillis() - startTime) + " seconds"); try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "infoStores.javabin")); GZIPOutputStream gzos = new GZIPOutputStream(fos); @@ -336,7 +354,7 @@ private static void saveInfoStores() throws IOException, FileNotFoundException { logger.info("completed load in " + (System.currentTimeMillis() - startTime) + " seconds"); } - public static void splitInfoStoresByColumn() throws FileNotFoundException, IOException { + public void splitInfoStoresByColumn() throws FileNotFoundException, IOException { logger.debug("Splitting" + (System.currentTimeMillis() - startTime) + " seconds"); try { VCFPerPatientInfoStoreSplitter.splitAll(storageDir, new File(mergedDirStr)); @@ -346,7 +364,7 @@ public static void splitInfoStoresByColumn() throws FileNotFoundException, IOExc logger.debug("Split" + (System.currentTimeMillis() - startTime) + " seconds"); } - public static void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException { + public void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOException { logger.debug("Converting" + (System.currentTimeMillis() - startTime) + " seconds"); try { VCFPerPatientInfoStoreToFBBIISConverter.convertAll(mergedDirStr, storageDirStr); @@ -356,7 +374,7 @@ public static void convertInfoStoresToByteIndexed() throws FileNotFoundException logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } - private static void shutdownChunkWriteExecutor() { + private void shutdownChunkWriteExecutor() { chunkWriteEx.shutdown(); while (!chunkWriteEx.isTerminated()) { try { @@ -379,7 +397,7 @@ private static ConcurrentHashMap convertLoadingMap static TreeMap infoStoreFlipped = new TreeMap(); - private static class VCFWalker implements Comparable { + private class VCFWalker implements Comparable { private List indices; private Integer[] vcfOffsets; From ea8846ac88af6bc51c62e77feada37184d9203c9 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Wed, 26 Feb 2025 13:20:42 -0500 Subject: [PATCH 03/25] ALS-7737: Add VcfLoader that splits by chromosome. todo: variant metadata in a similar fashion --- .../hpds/etl/genotype/NewVCFLoader.java | 77 +++--- .../genotype/SplitChromosomeVcfLoader.java | 235 ++++++++++++++++++ .../processing/GenomicProcessorConfig.java | 14 +- .../application-integration-test.properties | 4 +- .../util/BuildIntegrationTestEnvironment.java | 22 +- 5 files changed, 307 insertions(+), 45 deletions(-) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java index 7b2a9946..2f55f442 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/NewVCFLoader.java @@ -27,15 +27,15 @@ public class NewVCFLoader { public static final String DEFAULT_MERGED_DIR = "/opt/local/hpds/merged"; private static Logger logger = LoggerFactory.getLogger(NewVCFLoader.class); - private File indexFile; - private File storageDir; - private String storageDirStr; - private String mergedDirStr; + protected File indexFile; + protected File storageDir; + protected String storageDirStr; + protected String mergedDirStr; - private VariantIndexBuilder variantIndexBuilder; + protected VariantIndexBuilder variantIndexBuilder; // DO NOT CHANGE THIS unless you want to reload all the data everywhere. - private static int CHUNK_SIZE = 1000; + protected static int CHUNK_SIZE = 1000; /** * @param args - if testing, this should be an array ['vcfIndexFile path', 'output storage dir', 'merged dir']. @@ -52,8 +52,12 @@ public static void main(String[] args) throws IOException { logger.info("Using default values"); vcfLoader = new NewVCFLoader(); } + vcfLoader.loadAndMerge(); + } - vcfLoader.loadVCFs(); + protected void loadAndMerge() throws IOException { + createWalkers(); + loadVCFs(); } public NewVCFLoader() { @@ -72,27 +76,23 @@ public NewVCFLoader(File indexFile, String storageDir, String mergedDirStr) { this.variantIndexBuilder = new VariantIndexBuilder(); } - private ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1); + protected ExecutorService chunkWriteEx = Executors.newFixedThreadPool(1); - private ConcurrentHashMap infoStoreMap = new ConcurrentHashMap(); + protected ConcurrentHashMap infoStoreMap = new ConcurrentHashMap(); - private HashMap zygosityMaskStrings; + protected HashMap zygosityMaskStrings; - private TreeMap>> variantMaskStorage = new TreeMap<>(); + protected TreeMap>> variantMaskStorage = new TreeMap<>(); - private long startTime; + protected long startTime; - private List walkers = new ArrayList<>(); + protected List walkers = new ArrayList<>(); private boolean contigIsHemizygous; - private void loadVCFs() throws IOException { + protected void loadVCFs() throws IOException { startTime = System.currentTimeMillis(); - List vcfIndexLines = parseVCFIndex(indexFile); - for (VCFIndexLine line : vcfIndexLines) { - walkers.add(new VCFWalker(line)); - } - TreeSet allPatientIds = new TreeSet(); + TreeSet allPatientIds = new TreeSet<>(); // Pull the INFO columns out of the headers for each walker and add all patient ids walkers.stream().forEach(walker -> { @@ -252,7 +252,14 @@ private void loadVCFs() throws IOException { saveVariantStore(store, variantMaskStorage); } - private String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) { + private void createWalkers() { + List vcfIndexLines = parseVCFIndex(indexFile); + for (VCFIndexLine line : vcfIndexLines) { + walkers.add(new VCFWalker(line)); + } + } + + protected String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) { StringBuilder idList = new StringBuilder(); if (variantMask != null) { if (variantMask instanceof VariantMaskBitmaskImpl) { @@ -271,7 +278,7 @@ private String sampleIdsForMask(String[] sampleIds, VariantMask variantMask) { return idList.toString(); } - private void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk, + protected void flipChunk(String lastContigProcessed, int lastChunkProcessed, int currentChunk, String currentContig, boolean isLastChunk, String currentLine) throws IOException, FileNotFoundException { if (!currentContig.contentEquals(lastContigProcessed) || isLastChunk) { if (infoStoreFlipped.get(lastContigProcessed) == null || !infoStoreFlipped.get(lastContigProcessed)) { @@ -330,7 +337,7 @@ private void flipChunk(String lastContigProcessed, int lastChunkProcessed, int c } } - private void saveVariantStore(VariantStore store, + protected void saveVariantStore(VariantStore store, TreeMap>> variantMaskStorage) throws IOException, FileNotFoundException { store.setVariantMaskStorage(variantMaskStorage); @@ -343,7 +350,7 @@ private void saveVariantStore(VariantStore store, logger.debug("Done saving variant masks."); } - private void saveInfoStores() throws IOException, FileNotFoundException { + protected void saveInfoStores() throws IOException, FileNotFoundException { logger.debug("Saving info" + (System.currentTimeMillis() - startTime) + " seconds"); try (FileOutputStream fos = new FileOutputStream(new File(storageDir, "infoStores.javabin")); GZIPOutputStream gzos = new GZIPOutputStream(fos); @@ -374,7 +381,7 @@ public void convertInfoStoresToByteIndexed() throws FileNotFoundException, IOExc logger.debug("Converted " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } - private void shutdownChunkWriteExecutor() { + protected void shutdownChunkWriteExecutor() { chunkWriteEx.shutdown(); while (!chunkWriteEx.isTerminated()) { try { @@ -397,16 +404,16 @@ private static ConcurrentHashMap convertLoadingMap static TreeMap infoStoreFlipped = new TreeMap(); - private class VCFWalker implements Comparable { + protected class VCFWalker implements Comparable { - private List indices; - private Integer[] vcfOffsets; - private Integer[] bitmaskOffsets; - private HashMap vcfIndexLookup; - private String currentLine; - private String[] currentLineSplit; - private BufferedReader vcfReader; - private VCFIndexLine vcfIndexLine; + protected List indices; + protected Integer[] vcfOffsets; + protected Integer[] bitmaskOffsets; + protected HashMap vcfIndexLookup; + protected String currentLine; + protected String[] currentLineSplit; + protected BufferedReader vcfReader; + protected VCFIndexLine vcfIndexLine; boolean hasNext = true; String currentContig; Integer currentPosition; @@ -489,7 +496,7 @@ private void setMasksForSample(char[][] zygosityMaskStrings, int index, int star zygosityMaskStrings[patientZygosityIndex][bitmaskOffsets[index]] = '1'; } - private String currentSpecNotation() { + protected String currentSpecNotation() { String[] variantInfo = currentLineSplit[7].split("[=;]"); String gene = "NULL"; String consequence = "NULL"; @@ -636,7 +643,7 @@ public int compareTo(VCFWalker o) { private static final int SAMPLE_RELATIONSHIPS_COLUMN = 6; private static final int RELATED_SAMPLE_IDS_COLUMN = 7; - private static class VCFIndexLine implements Comparable { + protected static class VCFIndexLine implements Comparable { String vcfPath; String contig; boolean isAnnotated; diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java new file mode 100644 index 00000000..0b0dad74 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -0,0 +1,235 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariableVariantMasks; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMask; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; +import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedByteIndexedStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +public class SplitChromosomeVcfLoader extends NewVCFLoader { + + private static Logger logger = LoggerFactory.getLogger(SplitChromosomeVcfLoader.class); + private String[] allSampleIds; + private Integer[] patientIds; + private TreeSet allPatientIds; + + private final String baseStorageDir; + private final String baseMergeDir; + + public SplitChromosomeVcfLoader(File file, String baseStorageDir, String baseMergeDir) { + super(file, baseStorageDir, baseMergeDir); + this.baseStorageDir = baseStorageDir; + this.baseMergeDir = baseMergeDir; + } + + public SplitChromosomeVcfLoader() { + super(); + this.baseStorageDir = DEFAULT_STORAGE_DIR; + this.baseMergeDir = DEFAULT_MERGED_DIR; + } + + + + public static void main(String[] args) throws IOException { + NewVCFLoader vcfLoader; + if(args != null && args.length >= 3) { + logger.info("Reading parameters from input"); + vcfLoader = new SplitChromosomeVcfLoader(new File(args[0]), args[1], args[2]); + } else { + logger.info(args.length + " arguments provided"); + logger.info("Using default values"); + vcfLoader = new NewVCFLoader(); + } + vcfLoader.loadAndMerge(); + + vcfLoader.shutdownChunkWriteExecutor(); + } + + + protected void loadVCFs() throws IOException { + startTime = System.currentTimeMillis(); + allPatientIds = new TreeSet<>(); + + // Pull the INFO columns out of the headers for each walker and add all patient ids + walkers.stream().forEach(walker -> { + try { + logger.info("Reading headers of VCF [" + walker.vcfIndexLine.vcfPath + "]"); + walker.readHeaders(infoStoreMap); + allPatientIds.addAll(Arrays.asList(walker.vcfIndexLine.patientIds)); + } catch (IOException e) { + logger.error("Error while reading headers of VCF [" + walker.vcfIndexLine.vcfPath + "]", e); + System.exit(-1); + } + }); + + patientIds = allPatientIds.toArray(new Integer[0]); + allSampleIds = new String[allPatientIds.size()]; + + walkers.parallelStream().forEach(walker -> { + logger.info("Setting bitmask offsets for VCF [" + walker.vcfIndexLine.vcfPath + "]"); + walker.setBitmaskOffsets(patientIds); + for (int x = 0; x < walker.vcfIndexLine.sampleIds.length; x++) { + allSampleIds[Arrays.binarySearch(patientIds, + walker.vcfIndexLine.patientIds[x])] = walker.vcfIndexLine.sampleIds[x]; + } + }); + + for (VCFWalker walker : walkers) { + chunkWriteEx = Executors.newFixedThreadPool(1); + storageDirStr = baseStorageDir + "/" + walker.currentContig; + storageDir = new File(storageDirStr); + storageDir.mkdirs(); + mergedDirStr = baseMergeDir + "/" + walker.currentContig; + new File(mergedDirStr).mkdirs(); + variantIndexBuilder = new VariantIndexBuilder(); + variantMaskStorage = new TreeMap<>(); + walkers = new ArrayList<>(); + walkers.add(walker); + loadSingleContig(); + } + } + + private void loadSingleContig() throws IOException { + VariantStore store = new VariantStore(); + store.setPatientIds(allPatientIds.stream().map((id) -> { + return id.toString(); + }).collect(Collectors.toList()).toArray(new String[0])); + + String lastContigProcessed = null; + int lastChunkProcessed = 0; + int currentChunk = 0; + String[] currentContig = new String[1]; + int[] currentPosition = { -1 }; + String[] currentRef = new String[1]; + String[] currentAlt = new String[1]; + + zygosityMaskStrings = new HashMap(); + + List positionsProcessedInChunk = new ArrayList<>(); + while (walkers.stream().anyMatch(walker -> { + return walker.hasNext; + })) { + Collections.sort(walkers); + VCFWalker lowestWalker = walkers.get(0); + String currentSpecNotation = lowestWalker.currentSpecNotation(); + currentContig[0] = lowestWalker.currentContig; + currentPosition[0] = lowestWalker.currentPosition; + currentRef[0] = lowestWalker.currentRef; + currentAlt[0] = lowestWalker.currentAlt; + currentChunk = lowestWalker.currentPosition / CHUNK_SIZE; + positionsProcessedInChunk.add(currentPosition[0]); + + if (lastContigProcessed == null) { + lastContigProcessed = lowestWalker.currentContig; + } + + flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], false, + lowestWalker.currentLine); + lastContigProcessed = lowestWalker.currentContig; + lastChunkProcessed = currentChunk; + + char[][][] maskStringsForVariantSpec = { zygosityMaskStrings.get(currentSpecNotation) }; + if (maskStringsForVariantSpec[0] == null) { + maskStringsForVariantSpec[0] = new char[7][allPatientIds.size()]; + for (int x = 0; x < maskStringsForVariantSpec[0].length; x++) { + maskStringsForVariantSpec[0][x] = new char[allPatientIds.size()]; + for (int y = 0; y < allPatientIds.size(); y++) { + maskStringsForVariantSpec[0][x][y] = '0'; + } + } + } + walkers.stream().filter((walker) -> { + return walker.currentPosition == currentPosition[0] && walker.currentAlt == currentAlt[0] + && walker.currentRef == currentRef[0] && walker.currentContig.contentEquals(currentContig[0]); + }).forEach((walker) -> { + walker.updateRecords(maskStringsForVariantSpec[0], infoStoreMap); + try { + walker.nextLine(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]); + walkers = walkers.stream().filter((walker) -> { + return walker.hasNext; + }).collect(Collectors.toList()); + } + + flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], true, null); + + shutdownChunkWriteExecutor(); + + saveInfoStores(); + + splitInfoStoresByColumn(); + + convertInfoStoresToByteIndexed(); + + if (logger.isDebugEnabled()) { + // Log out the first and last 50 variants + int[] count = { 0 }; + for (String contig : store.getVariantMaskStorage().keySet()) { + ArrayList chunkIds = new ArrayList<>(); + FileBackedByteIndexedStorage> chromosomeStorage = store.getVariantMaskStorage() + .get(contig); + if (chromosomeStorage != null) { + // print out the top and bottom 50 variants in the store (that have masks) + chunkIds.addAll(chromosomeStorage.keys()); + for (Integer chunkId : chunkIds) { + for (String variantSpec : chromosomeStorage.get(chunkId).keySet()) { + count[0]++; + VariableVariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); + if (variantMasks != null) { + VariantMask heterozygousMask = variantMasks.heterozygousMask; + String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); + VariantMask homozygousMask = variantMasks.homozygousMask; + String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); + + if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) + logger.debug(variantSpec + " : heterozygous : " + heteroIdList); + if (!homoIdList.isEmpty() && homoIdList.length() < 1000) + logger.debug(variantSpec + " : homozygous : " + homoIdList); + } + } + if (count[0] > 50) + break; + } + + count[0] = 0; + for (int x = chunkIds.size() - 1; x > 0; x--) { + int chunkId = chunkIds.get(x); + chromosomeStorage.get(chunkId).keySet().forEach((variantSpec) -> { + count[0]++; + VariableVariantMasks variantMasks = chromosomeStorage.get(chunkId).get(variantSpec); + if (variantMasks != null) { + VariantMask heterozygousMask = variantMasks.heterozygousMask; + String heteroIdList = sampleIdsForMask(allSampleIds, heterozygousMask); + VariantMask homozygousMask = variantMasks.homozygousMask; + String homoIdList = sampleIdsForMask(allSampleIds, homozygousMask); + + if (!heteroIdList.isEmpty() && heteroIdList.length() < 1000) + logger.debug(variantSpec + " : heterozygous : " + heteroIdList); + if (!homoIdList.isEmpty() && homoIdList.length() < 1000) + logger.debug(variantSpec + " : homozygous : " + homoIdList); + } + }); + if (count[0] > 50) + break; + } + } + } + } + + store.setVariantSpecIndex(variantIndexBuilder.getVariantSpecIndex().toArray(new String[0])); + saveVariantStore(store, variantMaskStorage); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java index 35286128..1fc74559 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -34,11 +34,19 @@ public GenomicProcessor localGenomicProcessor() { @Bean(name = "localDistributedGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "localDistributed") public GenomicProcessor localDistributedGenomicProcessor() { - List genomicProcessors = Flux.range(1, 22) - .flatMap(i -> Mono.fromCallable(() -> (GenomicProcessor) new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/" + i + "/")).subscribeOn(Schedulers.boundedElastic())) + // assumed for now that all first level directories contain a genomic dataset for a group of studies + File[] directories = new File(hpdsGenomicDataDirectory).listFiles(File::isDirectory); + if (directories.length > 50) { + throw new IllegalArgumentException("Number of chromosome partitions exceeds maximum of 50 (" + directories.length + ")"); + } + + List genomicProcessors = Flux.fromArray(directories) + .flatMap(i -> Mono.fromCallable(() -> { + return (GenomicProcessor) new GenomicProcessorNodeImpl(i.getAbsolutePath() + "/"); + }).subscribeOn(Schedulers.boundedElastic())) .collectList() .block(); - genomicProcessors.add(new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/X/")); + //genomicProcessors.add(new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/X/")); return new GenomicProcessorParentImpl(genomicProcessors); } diff --git a/service/src/main/resources/application-integration-test.properties b/service/src/main/resources/application-integration-test.properties index 90547f24..006af76d 100644 --- a/service/src/main/resources/application-integration-test.properties +++ b/service/src/main/resources/application-integration-test.properties @@ -4,6 +4,6 @@ LARGE_TASK_THREADS = 1 ID_BATCH_SIZE=1000 VCF_EXCERPT_ENABLED=true -hpds.genomicProcessor.impl=local -HPDS_GENOMIC_DATA_DIRECTORY=target/ +hpds.genomicProcessor.impl=localDistributed +HPDS_GENOMIC_DATA_DIRECTORY=target/all/ HPDS_DATA_DIRECTORY=target/test-classes/phenotypic/ \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java index 4b91cece..05f2cf21 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java @@ -3,29 +3,41 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.BucketIndexBySample; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; +import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; public enum BuildIntegrationTestEnvironment { INSTANCE; private static final String PHENOTYPIC_DATA_DIRECTORY = "target/test-classes/phenotypic/"; private final String VCF_INDEX_FILE = "./src/test/resources/test_vcfIndex.tsv"; - private final String STORAGE_DIR = "./target/"; + private final String STORAGE_DIR = "./target/all/"; private final String MERGED_DIR = "./target/merged/"; public String binFile = "target/VariantMetadata.javabin"; BuildIntegrationTestEnvironment() { try { - NewVCFLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR}); + SplitChromosomeVcfLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR}); CSVLoader.main(new String[] {PHENOTYPIC_DATA_DIRECTORY}); VariantMetadataLoader.main(new String[] {"./src/test/resources/test_vcfIndex.tsv", binFile, "target/VariantMetadataStorage.bin"}); - VariantStore variantStore = VariantStore.readInstance(STORAGE_DIR); - BucketIndexBySample bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR); - } catch (IOException | ClassNotFoundException e) { + List.of("chr4", "chr20", "chr21").forEach(dir -> { + VariantStore variantStore = null; + try { + variantStore = VariantStore.readInstance(STORAGE_DIR + dir + "/"); + BucketIndexBySample bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR + dir + "/"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + }); + } catch (IOException e) { throw new RuntimeException(e); } From e6338be8a08c1ecf105afca6a3239013c295601c Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Wed, 26 Feb 2025 15:17:10 -0500 Subject: [PATCH 04/25] ALS-7737: Add VariantMetadataLoader that loads per chromosome --- .../data/genotype/VariantMetadataIndex.java | 2 +- .../SplitChromosomeVariantMetadataLoader.java | 178 ++++++++++++++++++ .../genotype/SplitChromosomeVcfLoader.java | 4 +- .../util/BuildIntegrationTestEnvironment.java | 3 +- 4 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java index ebdb3413..63272e04 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/VariantMetadataIndex.java @@ -34,7 +34,7 @@ public class VariantMetadataIndex implements Serializable { private final Map> > indexMap = new HashMap<>(); public static final String VARIANT_METADATA_STORAGE_FILE_PREFIX = "VariantMetadataStorage"; - private static String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX; + private String fileStoragePrefix = "/opt/local/hpds/all/" + VARIANT_METADATA_STORAGE_FILE_PREFIX; /** * This map allows us to load millions of variants without re-writing the fbbis each time (which would blow up the disk space). diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java new file mode 100644 index 00000000..412f6c20 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java @@ -0,0 +1,178 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + + +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMetadataIndex; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMetadataIndex.VARIANT_METADATA_FILENAME; +import static edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMetadataIndex.VARIANT_METADATA_STORAGE_FILE_PREFIX; + +/** + * + * This loader will read in the metadata associated with each variant and build a VariantMetadataIndex that + * can be used to populate data in the PIC-SURE Varaint Explorer. + */ +public class SplitChromosomeVariantMetadataLoader { + + public static final String DEFAULT_TSV_FILENAME = "vcfIndex.tsv"; + public static final String DEFAULT_HPDS_DIRECTORY = "/opt/local/hpds/"; + + public static final String DEFAULT_HPDS_DATA_DIRECTORY = DEFAULT_HPDS_DIRECTORY + "all/"; + private static Logger log = LoggerFactory.getLogger(SplitChromosomeVariantMetadataLoader.class); + + //fields to allow tests to override default file location + private static String hpdsDataPath = null; + + private static final int + INFO_COLUMN = 7, + ANNOTATED_FLAG_COLUMN = 2, + GZIP_FLAG_COLUMN=3, + FILE_COLUMN = 0; + + public static void main(String[] args) throws IOException { + File vcfIndexFile; + + log.info(new File(".").getAbsolutePath()); + if(args.length > 0 && new File(args[0]).exists()) { + log.info("using path from command line"); + vcfIndexFile = new File(args[0]); + hpdsDataPath = args[1]; + }else { + hpdsDataPath = DEFAULT_HPDS_DATA_DIRECTORY; + vcfIndexFile = new File(DEFAULT_HPDS_DIRECTORY + DEFAULT_TSV_FILENAME); + } + + List vcfIndexFiles = new ArrayList<>(); + + try(CSVParser parser = CSVParser.parse(vcfIndexFile, Charset.forName("UTF-8"), CSVFormat.DEFAULT.withDelimiter('\t').withSkipHeaderRecord(true))) { + final boolean[] horribleHeaderSkipFlag = {false}; + parser.forEach((CSVRecord r)->{ + if(horribleHeaderSkipFlag[0]) { + File vcfFileLocal = new File(r.get(FILE_COLUMN)); + if(Integer.parseInt(r.get(ANNOTATED_FLAG_COLUMN).trim())==1) { + VcfInputFile vcfInput = new VcfInputFile(vcfFileLocal, (Integer.parseInt(r.get(GZIP_FLAG_COLUMN).trim())==1)); + vcfIndexFiles.add(vcfInput); + } + }else { + horribleHeaderSkipFlag[0] = true; + } + }); + } + + vcfIndexFiles.parallelStream().forEach(SplitChromosomeVariantMetadataLoader::createVariantMetadataIndexForContig); + + + } + + private static void createVariantMetadataIndexForContig(VcfInputFile vcfInput) { + try { + String contig = vcfInput.currentContig; + VariantMetadataIndex metadataIndex = new VariantMetadataIndex(hpdsDataPath + contig + "/" + VARIANT_METADATA_STORAGE_FILE_PREFIX); + + while (vcfInput.hasNextVariant()) { + metadataIndex.put(vcfInput.currentVariantSpec, vcfInput.currentMetaData); + vcfInput.nextVariant(); + } + + metadataIndex.complete(); + + //store this in a path per contig (or a preset path + String binfilePath = hpdsDataPath + contig + "/" + VARIANT_METADATA_FILENAME; + + try(ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(new File(binfilePath))))){ + out.writeObject(metadataIndex); + out.flush(); + } + + log.info("Finished processing: "+ vcfInput.fileName); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static class VcfInputFile implements Comparable { + + Iterator iterator; + CSVParser parser; + + String fileName; + String currentContig; + String currentVariantSpec; + String currentMetaData; + + /** + * read in an vcfFile, skip the header rows, and queue up the first variant (with metadata) + * @param vcfFile + * @param gzipped + */ + public VcfInputFile(File vcfFile, boolean gzipped) { + fileName = vcfFile.getName(); + log.info("Processing VCF file: " + fileName); + try{ + Reader reader = new InputStreamReader( gzipped ? new GZIPInputStream(new FileInputStream(vcfFile)) : new FileInputStream(vcfFile)); + parser = new CSVParser(reader, CSVFormat.DEFAULT.withDelimiter('\t').withSkipHeaderRecord(false)); + + iterator = parser.iterator(); + while(iterator.hasNext()) { + CSVRecord csvRecord = iterator.next(); + //skip all header rows + if(csvRecord.get(0).startsWith("#")) { + continue; + } + + VariantSpec variantSpec = new VariantSpec(csvRecord); + currentContig = variantSpec.metadata.chromosome; + currentVariantSpec = variantSpec.specNotation(); + currentMetaData = csvRecord.get(INFO_COLUMN).trim(); + break; + } + + }catch(IOException e) { + log.error("Error processing VCF file: " + vcfFile.getName(), e); + } + + } + + boolean hasNextVariant() { + return iterator.hasNext(); + } + + void nextVariant() { + CSVRecord csvRecord = iterator.next(); + //skip all header rows + if(csvRecord.get(0).startsWith("#")) { + return; + } + + VariantSpec variantSpec = new VariantSpec(csvRecord); + currentContig = variantSpec.metadata.chromosome; + currentVariantSpec = variantSpec.specNotation(); + currentMetaData = csvRecord.get(INFO_COLUMN).trim(); + } + + /** + * These files will be sorted by the current variant spec. We need to make sure they are never actually + * equal values (since the TreeSet used to keep them sorted enforces uniqueness) + */ + @Override + public int compareTo(VcfInputFile arg0) { + return (currentVariantSpec + iterator.toString()).compareTo(arg0.currentVariantSpec + arg0.iterator.toString()); + } + + + } +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java index 0b0dad74..69e8b091 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -115,7 +115,7 @@ private void loadSingleContig() throws IOException { zygosityMaskStrings = new HashMap(); List positionsProcessedInChunk = new ArrayList<>(); - while (walkers.stream().anyMatch(walker -> { + while (walkers.parallelStream().anyMatch(walker -> { return walker.hasNext; })) { Collections.sort(walkers); @@ -159,7 +159,7 @@ private void loadSingleContig() throws IOException { } }); zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]); - walkers = walkers.stream().filter((walker) -> { + walkers = walkers.parallelStream().filter((walker) -> { return walker.hasNext; }).collect(Collectors.toList()); } diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java index 05f2cf21..a3b89ef3 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/util/BuildIntegrationTestEnvironment.java @@ -3,6 +3,7 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.BucketIndexBySample; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.NewVCFLoader; +import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVariantMetadataLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader; import edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.CSVLoader; @@ -25,7 +26,7 @@ public enum BuildIntegrationTestEnvironment { try { SplitChromosomeVcfLoader.main(new String[] {VCF_INDEX_FILE, STORAGE_DIR, MERGED_DIR}); CSVLoader.main(new String[] {PHENOTYPIC_DATA_DIRECTORY}); - VariantMetadataLoader.main(new String[] {"./src/test/resources/test_vcfIndex.tsv", binFile, "target/VariantMetadataStorage.bin"}); + SplitChromosomeVariantMetadataLoader.main(new String[] {"./src/test/resources/test_vcfIndex.tsv", STORAGE_DIR}); List.of("chr4", "chr20", "chr21").forEach(dir -> { VariantStore variantStore = null; try { From 04e47bf58b6b03db15cf7742f7e22766418cc622 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 27 Feb 2025 14:08:55 -0500 Subject: [PATCH 05/25] ALS-7737: Fix bugs discovered by splitting genomic data by chromosome --- .../data/genotype/BucketIndexBySample.java | 6 +- .../SplitChromosomeVariantMetadataLoader.java | 3 +- .../processing/GenomicProcessorNodeImpl.java | 25 ++++--- .../hpds/test/BucketIndexBySampleTest.java | 67 ++++++++----------- .../hpds/test/VariantMetadataIndexTest.java | 35 ++++++---- 5 files changed, 74 insertions(+), 62 deletions(-) diff --git a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java index 012fb909..c275063d 100644 --- a/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java +++ b/data/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/genotype/BucketIndexBySample.java @@ -159,7 +159,11 @@ public Set filterVariantSetForPatientSet(Set variantSet, Collect String bucketKey = variantSpec.split(",")[0] + ":" + (Integer.parseInt(variantSpec.split(",")[1])/1000); //testBit uses inverted indexes include +2 offset for bookends - return _bucketMask.testBit(maxIndex - Collections.binarySearch(bucketList, bucketKey) + 2); + int bucketKeyIndex = Collections.binarySearch(bucketList, bucketKey); + if (bucketKeyIndex < 0) { + return false; + } + return _bucketMask.testBit(maxIndex - bucketKeyIndex + 2); }).collect(Collectors.toSet()); } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java index 412f6c20..820c3c7b 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java @@ -73,7 +73,7 @@ public static void main(String[] args) throws IOException { }); } - vcfIndexFiles.parallelStream().forEach(SplitChromosomeVariantMetadataLoader::createVariantMetadataIndexForContig); + vcfIndexFiles.stream().forEach(SplitChromosomeVariantMetadataLoader::createVariantMetadataIndexForContig); } @@ -87,6 +87,7 @@ private static void createVariantMetadataIndexForContig(VcfInputFile vcfInput) { metadataIndex.put(vcfInput.currentVariantSpec, vcfInput.currentMetaData); vcfInput.nextVariant(); } + metadataIndex.put(vcfInput.currentVariantSpec, vcfInput.currentMetaData); metadataIndex.complete(); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java index ea612b89..58286fac 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorNodeImpl.java @@ -173,14 +173,20 @@ private List getVariantsMatchingFilters(Query.VariantInfoFilter fi } if(filter.numericVariantInfoFilters != null && !filter.numericVariantInfoFilters.isEmpty()) { filter.numericVariantInfoFilters.forEach((String column, Filter.FloatFilter doubleFilter)->{ - FileBackedByteIndexedInfoStore infoStore = getInfoStore(column); + Optional infoStoreOptional = getInfoStore(column); doubleFilter.getMax(); Range filterRange = Range.closed(doubleFilter.getMin(), doubleFilter.getMax()); - List valuesInRange = infoStore.continuousValueIndex.getValuesInRange(filterRange); - for(String value : valuesInRange) { - variantIndices.add(variantIndexCache.get(column, value)); - } + infoStoreOptional.ifPresentOrElse(infoStore -> { + List valuesInRange = infoStore.continuousValueIndex.getValuesInRange(filterRange); + for(String value : valuesInRange) { + variantIndices.add(variantIndexCache.get(column, value)); + }}, + () -> { + variantIndices.add(VariantIndex.empty()); + } + ); + }); } return variantIndices; @@ -190,9 +196,10 @@ private VariantIndex getComputedVariantIndexForCategoryFilter(Map.Entry infoStoreOptional = getInfoStore(column); - List infoKeys = filterInfoCategoryKeys(values, infoStore); + List infoKeys = infoStoreOptional.map(infoStore -> filterInfoCategoryKeys(values, infoStore)) + .orElseGet(ArrayList::new); if(infoKeys.size()>1) { // These should be ANDed @@ -227,8 +234,8 @@ public VariantMask createMaskForPatientSet(Set patientSubset) { return new VariantMaskBitmaskImpl(patientVariantJoinHandler.createMaskForPatientSet(patientSubset)); } - private FileBackedByteIndexedInfoStore getInfoStore(String column) { - return infoStores.get(column); + private Optional getInfoStore(String column) { + return Optional.ofNullable(infoStores.get(column)); } private VariantIndex addVariantsForInfoFilter(VariantIndex unionOfInfoFilters, Query.VariantInfoFilter filter) { diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/BucketIndexBySampleTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/BucketIndexBySampleTest.java index 81bffd5f..193392f3 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/BucketIndexBySampleTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/BucketIndexBySampleTest.java @@ -32,10 +32,12 @@ */ public class BucketIndexBySampleTest { - private static final String STORAGE_DIR = "./target/"; + private static final String STORAGE_DIR = "./target/all/"; + + private static BucketIndexBySample bucketIndexBySampleChr4; + private static BucketIndexBySample bucketIndexBySampleChr20; + private static BucketIndexBySample bucketIndexBySampleChr21; - private static BucketIndexBySample bucketIndexBySample; - //Some known variant specs from the input file. //Some known variant specs from the input file. These have been designed for testing partially overlapping specs private static final String spec1 = "chr4,9856624,CAAAAA,C,TVP23A,splice_acceptor_variant"; @@ -58,10 +60,14 @@ public class BucketIndexBySampleTest { @BeforeAll public static void initializeBinfile() throws Exception { BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE; - VariantStore variantStore = VariantStore.readInstance(STORAGE_DIR); - - //now use that object to initialize the BucketIndexBySample object - bucketIndexBySample = new BucketIndexBySample(variantStore, STORAGE_DIR); + VariantStore variantStoreChr4 = VariantStore.readInstance(STORAGE_DIR + "chr4/"); + bucketIndexBySampleChr4 = new BucketIndexBySample(variantStoreChr4, STORAGE_DIR + "chr4/"); + + VariantStore variantStoreChr20 = VariantStore.readInstance(STORAGE_DIR + "chr20/"); + bucketIndexBySampleChr20 = new BucketIndexBySample(variantStoreChr20, STORAGE_DIR + "chr20/"); + + VariantStore variantStoreChr21 = VariantStore.readInstance(STORAGE_DIR + "chr21/"); + bucketIndexBySampleChr21 = new BucketIndexBySample(variantStoreChr21, STORAGE_DIR + "chr21/"); } @BeforeEach @@ -77,7 +83,7 @@ public void test_filterVariantSetForPatientSet_noPatients() throws IOException { variantSet.add(spec2); variantSet.add(spec3); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); assertTrue("Empty Patient List should filter out all variants", filteredVariantSet.isEmpty()); } @@ -88,7 +94,7 @@ public void test_filterVariantSetForPatientSet_noVariants() throws IOException { patientSet.add(200689); patientSet.add(200972); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); assertTrue("Empty Variant Set should remain empty", filteredVariantSet.isEmpty()); } @@ -106,7 +112,7 @@ public void test_filterVariantSetForPatientSet_VariantsWithoutPatientsLastBucket patientSet.add(200706); patientSet.add(200709); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); assertTrue("Patients should not match any variants in the list", filteredVariantSet.isEmpty()); } @@ -122,22 +128,23 @@ public void test_filterVariantSetForPatientSet_PatientsWithNoVariantsFirstBucket patientSet.add(197508); patientSet.add(197509); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); assertTrue("Patients should not match any variants in the list", filteredVariantSet.isEmpty()); } + @Test - public void test_filterVariantSetForPatientSet_PatientsWithNoVariantsFirstBucketNoCall() throws IOException { + public void filterVariantSetForPatientSet_variantsDoNotExist_returnNoVariants() throws IOException { System.out.println("test_filterVariantSetForPatientSet_PatientsWithNoVariantsFirstBucket"); - variantSet.add("chr20,5032061,A,G,LOC102723996,missense_variant"); - variantSet.add("chr21,5032061,A,G,ABCDEF123456,synonymous_variant"); + variantSet.add(spec6); + variantSet.add(spec6b); patientSet.add(197506); patientSet.add(197508); patientSet.add(197509); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); assertTrue("Patients should not match any variants in the list", filteredVariantSet.isEmpty()); } @@ -153,7 +160,7 @@ public void test_filterVariantSetForPatientSet_allValidFirstBucket() throws IOEx patientSet.add(200689); patientSet.add(200972); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr21.filterVariantSetForPatientSet(variantSet, patientSet); assertEquals("No variants should be filtered out", 2, filteredVariantSet.size()); } @@ -168,28 +175,10 @@ public void test_filterVariantSetForPatientSet_allValidFirstBucketWithNoCall() t patientSet.add(200689); patientSet.add(200972); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr20.filterVariantSetForPatientSet(variantSet, patientSet); assertEquals("No variants should be filtered out", 2, filteredVariantSet.size()); } - - @Test - @Disabled - public void test_filterVariantSetForPatientSet_someValid() throws IOException { - System.out.println("test_filterVariantSetForPatientSet_someValid"); - - variantSet.add(spec2); - variantSet.add(spec6); - - patientSet.add(200392); - patientSet.add(200689); - patientSet.add(200972); - - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); - - assertEquals("One variant should be filtered out", 1, filteredVariantSet.size()); - assertTrue("Expected variant not found", filteredVariantSet.contains(spec1)); - } @Test public void test_filterVariantSetForPatientSet_allValidDifferentPatients() throws IOException { @@ -205,7 +194,8 @@ public void test_filterVariantSetForPatientSet_allValidDifferentPatients() throw patientSet.add(200710); patientSet.add(198206); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); + filteredVariantSet.addAll(bucketIndexBySampleChr21.filterVariantSetForPatientSet(variantSet, patientSet)); assertEquals("No variants should be filtered out", (long)4, (long)filteredVariantSet.size()); } @@ -226,8 +216,9 @@ public void test_filterVariantSetForPatientSet_someValidDifferentPatients() thro patientSet.add(9); patientSet.add(10); - Collection filteredVariantSet = bucketIndexBySample.filterVariantSetForPatientSet(variantSet, patientSet); - + Collection filteredVariantSet = bucketIndexBySampleChr4.filterVariantSetForPatientSet(variantSet, patientSet); + filteredVariantSet.addAll(bucketIndexBySampleChr21.filterVariantSetForPatientSet(variantSet, patientSet)); + assertEquals("One variant should be filtered out", (long)4, (long)filteredVariantSet.size()); assertFalse("Spec 9 should have been filtered out", filteredVariantSet.contains(spec9)); } diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/VariantMetadataIndexTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/VariantMetadataIndexTest.java index eb746408..7a53ac60 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/VariantMetadataIndexTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/test/VariantMetadataIndexTest.java @@ -23,8 +23,10 @@ public class VariantMetadataIndexTest { /** * The metadataIndex is non-mutable (or should be) so we only need one object to test */ - private static VariantMetadataIndex vmi; - public static String binFile = "target/VariantMetadata.javabin"; + private static VariantMetadataIndex variantMetadataIndexChr4; + private static VariantMetadataIndex variantMetadataIndexChr21; + public static String chr4BinFile = "target/all/chr4/VariantMetadata.javabin"; + public static String chr21BinFile = "target/all/chr21/VariantMetadata.javabin"; VariantBucketHolder bucketCache = new VariantBucketHolder(); //Some known variant specs from the input file. These have been designed for testing partially overlapping specs @@ -36,11 +38,18 @@ public class VariantMetadataIndexTest { @BeforeAll - public static void initializeBinfile() throws Exception { + public static void initializeBinfiles() throws Exception { BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE; - if(new File(binFile).exists()) { - try(ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(binFile)))){ - vmi = (VariantMetadataIndex) in.readObject(); + if(new File(chr4BinFile).exists()) { + try(ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(chr4BinFile)))){ + variantMetadataIndexChr4 = (VariantMetadataIndex) in.readObject(); + }catch(Exception e) { + e.printStackTrace(); + } + } + if(new File(chr21BinFile).exists()) { + try(ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(chr21BinFile)))){ + variantMetadataIndexChr21 = (VariantMetadataIndex) in.readObject(); }catch(Exception e) { e.printStackTrace(); } @@ -51,7 +60,7 @@ public static void initializeBinfile() throws Exception { public void findByMultipleVariantSpec_invalidSpec() { List variants = List.of("chr21,5032061,A,G,NOTAGENE,missense_variant"); Map> expectedResult = Map.of(); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr21.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", 1, data.size()); assertEquals("The expected values were not found.", Set.of(), data.get("chr21,5032061,A,G,NOTAGENE,missense_variant")); @@ -62,7 +71,7 @@ public void findByMultipleVariantSpec_validSpec() { Map> expectedResult = Map.of( "chr21,5032061,A,G,LOC102723996,missense_variant" , Set.of("Gene_with_variant=LOC102723996;Variant_severity=MODERATE;Variant_consequence_calculated=missense_variant;Variant_class=SNV;Variant_frequency_in_gnomAD=0.0001346;Variant_frequency_as_text=Rare")); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr21.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 1); variants.stream().forEach(variant->{ @@ -77,7 +86,7 @@ public void findByMultipleVariantSpec_validSpecs() { , Set.of("Gene_with_variant=LOC102723996;Variant_severity=MODERATE;Variant_consequence_calculated=missense_variant;Variant_class=SNV;Variant_frequency_in_gnomAD=0.0001346;Variant_frequency_as_text=Rare") ,"chr21,5033914,A,G,LOC102723996,missense_variant" , Set.of("Gene_with_variant=LOC102723996;Variant_severity=MODERATE;Variant_consequence_calculated=missense_variant;Variant_class=SNV;Variant_frequency_in_gnomAD=0.0009728;Variant_frequency_as_text=Rare")); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr21.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 2); variants.stream().forEach(variant->{ @@ -92,7 +101,7 @@ public void testMultipleVariantSpecSamePOS() { Map> expectedResult = Map.of( spec1, Set.of(spec1Info), spec4, Set.of(spec4Info)); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr4.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 2); variants.stream().forEach(variant->{ @@ -106,7 +115,7 @@ public void testMultipleVariantSpecSamePOSAndREF() { Map> expectedResult = Map.of( spec1, Set.of(spec1Info), spec5, Set.of(spec5Info)); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr4.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 2); variants.stream().forEach(variant->{ @@ -120,7 +129,7 @@ public void testMultipleVariantSpecSamePOSAndALT() { Map> expectedResult = Map.of( spec1, Set.of(spec1Info), spec2, Set.of(spec2Info)); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr4.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 2); variants.stream().forEach(variant->{ @@ -137,7 +146,7 @@ public void testMultipleVariantSpecSameSpec() { List variants = List.of(spec1, spec1); Map> expectedResult = Map.of( spec1, Set.of(spec1Info)); - Map> data = vmi.findByMultipleVariantSpec(variants); + Map> data = variantMetadataIndexChr4.findByMultipleVariantSpec(variants); assertEquals("Wrong number of records in response.", data.size(), 1); variants.stream().forEach(variant->{ From 0d6ecc5539c1c5a56c48cccaf578b466fb7335e9 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 27 Feb 2025 15:45:34 -0500 Subject: [PATCH 06/25] Remove duplicate code --- .../SplitChromosomeVariantMetadataLoader.java | 73 --------------- .../etl/genotype/VariantMetadataLoader.java | 77 +--------------- .../hpds/etl/genotype/VcfInputFile.java | 88 +++++++++++++++++++ 3 files changed, 90 insertions(+), 148 deletions(-) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VcfInputFile.java diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java index 820c3c7b..76ddeb06 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java @@ -38,7 +38,6 @@ public class SplitChromosomeVariantMetadataLoader { private static String hpdsDataPath = null; private static final int - INFO_COLUMN = 7, ANNOTATED_FLAG_COLUMN = 2, GZIP_FLAG_COLUMN=3, FILE_COLUMN = 0; @@ -104,76 +103,4 @@ private static void createVariantMetadataIndexForContig(VcfInputFile vcfInput) { throw new UncheckedIOException(e); } } - - private static class VcfInputFile implements Comparable { - - Iterator iterator; - CSVParser parser; - - String fileName; - String currentContig; - String currentVariantSpec; - String currentMetaData; - - /** - * read in an vcfFile, skip the header rows, and queue up the first variant (with metadata) - * @param vcfFile - * @param gzipped - */ - public VcfInputFile(File vcfFile, boolean gzipped) { - fileName = vcfFile.getName(); - log.info("Processing VCF file: " + fileName); - try{ - Reader reader = new InputStreamReader( gzipped ? new GZIPInputStream(new FileInputStream(vcfFile)) : new FileInputStream(vcfFile)); - parser = new CSVParser(reader, CSVFormat.DEFAULT.withDelimiter('\t').withSkipHeaderRecord(false)); - - iterator = parser.iterator(); - while(iterator.hasNext()) { - CSVRecord csvRecord = iterator.next(); - //skip all header rows - if(csvRecord.get(0).startsWith("#")) { - continue; - } - - VariantSpec variantSpec = new VariantSpec(csvRecord); - currentContig = variantSpec.metadata.chromosome; - currentVariantSpec = variantSpec.specNotation(); - currentMetaData = csvRecord.get(INFO_COLUMN).trim(); - break; - } - - }catch(IOException e) { - log.error("Error processing VCF file: " + vcfFile.getName(), e); - } - - } - - boolean hasNextVariant() { - return iterator.hasNext(); - } - - void nextVariant() { - CSVRecord csvRecord = iterator.next(); - //skip all header rows - if(csvRecord.get(0).startsWith("#")) { - return; - } - - VariantSpec variantSpec = new VariantSpec(csvRecord); - currentContig = variantSpec.metadata.chromosome; - currentVariantSpec = variantSpec.specNotation(); - currentMetaData = csvRecord.get(INFO_COLUMN).trim(); - } - - /** - * These files will be sorted by the current variant spec. We need to make sure they are never actually - * equal values (since the TreeSet used to keep them sorted enforces uniqueness) - */ - @Override - public int compareTo(VcfInputFile arg0) { - return (currentVariantSpec + iterator.toString()).compareTo(arg0.currentVariantSpec + arg0.iterator.toString()); - } - - - } } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantMetadataLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantMetadataLoader.java index 97d897fd..f4b5a81e 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantMetadataLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VariantMetadataLoader.java @@ -31,9 +31,8 @@ public class VariantMetadataLoader { //fields to allow tests to override default file location private static String storagePathForTests = null; private static String variantIndexPathForTests = null; - - private static final int - INFO_COLUMN = 7, + + private static final int ANNOTATED_FLAG_COLUMN = 2, GZIP_FLAG_COLUMN=3, FILE_COLUMN = 0; @@ -102,76 +101,4 @@ public static void main(String[] args) throws IOException { out.flush(); } } - - private static class VcfInputFile implements Comparable { - - Iterator iterator; - CSVParser parser; - - String fileName; - String currentContig; - String currentVariantSpec; - String currentMetaData; - - /** - * read in an vcfFile, skip the header rows, and queue up the first variant (with metadata) - * @param vcfFile - * @param gzipped - */ - public VcfInputFile(File vcfFile, boolean gzipped) { - fileName = vcfFile.getName(); - log.info("Processing VCF file: " + fileName); - try{ - Reader reader = new InputStreamReader( gzipped ? new GZIPInputStream(new FileInputStream(vcfFile)) : new FileInputStream(vcfFile)); - parser = new CSVParser(reader, CSVFormat.DEFAULT.withDelimiter('\t').withSkipHeaderRecord(false)); - - iterator = parser.iterator(); - while(iterator.hasNext()) { - CSVRecord csvRecord = iterator.next(); - //skip all header rows - if(csvRecord.get(0).startsWith("#")) { - continue; - } - - VariantSpec variantSpec = new VariantSpec(csvRecord); - currentContig = variantSpec.metadata.chromosome; - currentVariantSpec = variantSpec.specNotation(); - currentMetaData = csvRecord.get(INFO_COLUMN).trim(); - break; - } - - }catch(IOException e) { - log.error("Error processing VCF file: " + vcfFile.getName(), e); - } - - } - - boolean hasNextVariant() { - return iterator.hasNext(); - } - - void nextVariant() { - CSVRecord csvRecord = iterator.next(); - //skip all header rows - if(csvRecord.get(0).startsWith("#")) { - return; - } - - VariantSpec variantSpec = new VariantSpec(csvRecord); - currentContig = variantSpec.metadata.chromosome; - currentVariantSpec = variantSpec.specNotation(); - currentMetaData = csvRecord.get(INFO_COLUMN).trim(); - } - - /** - * These files will be sorted by the current variant spec. We need to make sure they are never actually - * equal values (since the TreeSet used to keep them sorted enforces uniqueness) - */ - @Override - public int compareTo(VcfInputFile arg0) { - return (currentVariantSpec + iterator.toString()).compareTo(arg0.currentVariantSpec + arg0.iterator.toString()); - } - - - } } diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VcfInputFile.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VcfInputFile.java new file mode 100644 index 00000000..eb7fc87c --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VcfInputFile.java @@ -0,0 +1,88 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype; + +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantSpec; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Iterator; +import java.util.zip.GZIPInputStream; + +public class VcfInputFile implements Comparable { + + private static final int INFO_COLUMN = 7; + + private static Logger log = LoggerFactory.getLogger(VcfInputFile.class); + + Iterator iterator; + CSVParser parser; + + String fileName; + String currentContig; + String currentVariantSpec; + String currentMetaData; + + /** + * read in an vcfFile, skip the header rows, and queue up the first variant (with metadata) + * @param vcfFile + * @param gzipped + */ + public VcfInputFile(File vcfFile, boolean gzipped) { + fileName = vcfFile.getName(); + log.info("Processing VCF file: " + fileName); + try{ + Reader reader = new InputStreamReader( gzipped ? new GZIPInputStream(new FileInputStream(vcfFile)) : new FileInputStream(vcfFile)); + parser = new CSVParser(reader, CSVFormat.DEFAULT.withDelimiter('\t').withSkipHeaderRecord(false)); + + iterator = parser.iterator(); + while(iterator.hasNext()) { + CSVRecord csvRecord = iterator.next(); + //skip all header rows + if(csvRecord.get(0).startsWith("#")) { + continue; + } + + VariantSpec variantSpec = new VariantSpec(csvRecord); + currentContig = variantSpec.metadata.chromosome; + currentVariantSpec = variantSpec.specNotation(); + currentMetaData = csvRecord.get(INFO_COLUMN).trim(); + break; + } + + }catch(IOException e) { + log.error("Error processing VCF file: " + vcfFile.getName(), e); + } + + } + + boolean hasNextVariant() { + return iterator.hasNext(); + } + + void nextVariant() { + CSVRecord csvRecord = iterator.next(); + //skip all header rows + if(csvRecord.get(0).startsWith("#")) { + return; + } + + VariantSpec variantSpec = new VariantSpec(csvRecord); + currentContig = variantSpec.metadata.chromosome; + currentVariantSpec = variantSpec.specNotation(); + currentMetaData = csvRecord.get(INFO_COLUMN).trim(); + } + + /** + * These files will be sorted by the current variant spec. We need to make sure they are never actually + * equal values (since the TreeSet used to keep them sorted enforces uniqueness) + */ + @Override + public int compareTo(VcfInputFile arg0) { + return (currentVariantSpec + iterator.toString()).compareTo(arg0.currentVariantSpec + arg0.iterator.toString()); + } + + +} \ No newline at end of file From cb1723979d4515f6b4c54d690c7f1d5311128697 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 27 Feb 2025 15:58:16 -0500 Subject: [PATCH 07/25] Refactor genomic config --- .../SplitChromosomeVariantMetadataLoader.java | 2 +- .../processing/GenomicProcessorConfig.java | 35 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java index 76ddeb06..b17b8188 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVariantMetadataLoader.java @@ -37,7 +37,7 @@ public class SplitChromosomeVariantMetadataLoader { //fields to allow tests to override default file location private static String hpdsDataPath = null; - private static final int + private static final int ANNOTATED_FLAG_COLUMN = 2, GZIP_FLAG_COLUMN=3, FILE_COLUMN = 0; diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java index 1fc74559..0a191536 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorConfig.java @@ -34,19 +34,7 @@ public GenomicProcessor localGenomicProcessor() { @Bean(name = "localDistributedGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "localDistributed") public GenomicProcessor localDistributedGenomicProcessor() { - // assumed for now that all first level directories contain a genomic dataset for a group of studies - File[] directories = new File(hpdsGenomicDataDirectory).listFiles(File::isDirectory); - if (directories.length > 50) { - throw new IllegalArgumentException("Number of chromosome partitions exceeds maximum of 50 (" + directories.length + ")"); - } - - List genomicProcessors = Flux.fromArray(directories) - .flatMap(i -> Mono.fromCallable(() -> { - return (GenomicProcessor) new GenomicProcessorNodeImpl(i.getAbsolutePath() + "/"); - }).subscribeOn(Schedulers.boundedElastic())) - .collectList() - .block(); - //genomicProcessors.add(new GenomicProcessorNodeImpl(hpdsGenomicDataDirectory + "/X/")); + List genomicProcessors = getGenomicProcessors(new File(hpdsGenomicDataDirectory)); return new GenomicProcessorParentImpl(genomicProcessors); } @@ -62,16 +50,27 @@ public GenomicProcessor localPatientDistributedGenomicProcessor() { List studyGroupedGenomicProcessors = new ArrayList<>(); for (File directory : directories) { - List genomicProcessors = Flux.range(1, 22) - .flatMap(i -> Mono.fromCallable(() -> (GenomicProcessor) new GenomicProcessorNodeImpl(directory.getAbsolutePath() + "/" + i + "/")).subscribeOn(Schedulers.boundedElastic())) - .collectList() - .block(); - genomicProcessors.add(new GenomicProcessorNodeImpl(directory + "/X/")); + List genomicProcessors = getGenomicProcessors(directory); studyGroupedGenomicProcessors.add(new GenomicProcessorParentImpl(genomicProcessors)); } return new GenomicProcessorPatientMergingParentImpl(studyGroupedGenomicProcessors); } + + private static List getGenomicProcessors(File directory) { + File[] secondLevelDirectories = directory.listFiles(File::isDirectory); + if (secondLevelDirectories.length > 50) { + throw new IllegalArgumentException("Number of chromosome partitions exceeds maximum of 50 (" + secondLevelDirectories.length + ")"); + } + + return Flux.fromArray(secondLevelDirectories) + .flatMap(i -> Mono.fromCallable(() -> { + return (GenomicProcessor) new GenomicProcessorNodeImpl(i.getAbsolutePath() + "/"); + }).subscribeOn(Schedulers.boundedElastic())) + .collectList() + .block(); + } + @Bean(name = "localPatientOnlyDistributedGenomicProcessor") @ConditionalOnProperty(prefix = "hpds.genomicProcessor", name = "impl", havingValue = "localPatientOnlyDistributed") public GenomicProcessor localPatientOnlyDistributedGenomicProcessor() { From ac16836115b254fe561b064048f9ddd08934119d Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 28 Feb 2025 14:22:13 -0500 Subject: [PATCH 08/25] ALS-7737: Cleanup hard to follow loops --- .../genotype/SplitChromosomeVcfLoader.java | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java index 69e8b091..1949653c 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -92,13 +92,11 @@ protected void loadVCFs() throws IOException { new File(mergedDirStr).mkdirs(); variantIndexBuilder = new VariantIndexBuilder(); variantMaskStorage = new TreeMap<>(); - walkers = new ArrayList<>(); - walkers.add(walker); - loadSingleContig(); + loadSingleContig(walker); } } - private void loadSingleContig() throws IOException { + private void loadSingleContig(VCFWalker walker) throws IOException { VariantStore store = new VariantStore(); store.setPatientIds(allPatientIds.stream().map((id) -> { return id.toString(); @@ -115,26 +113,23 @@ private void loadSingleContig() throws IOException { zygosityMaskStrings = new HashMap(); List positionsProcessedInChunk = new ArrayList<>(); - while (walkers.parallelStream().anyMatch(walker -> { - return walker.hasNext; - })) { - Collections.sort(walkers); - VCFWalker lowestWalker = walkers.get(0); - String currentSpecNotation = lowestWalker.currentSpecNotation(); - currentContig[0] = lowestWalker.currentContig; - currentPosition[0] = lowestWalker.currentPosition; - currentRef[0] = lowestWalker.currentRef; - currentAlt[0] = lowestWalker.currentAlt; - currentChunk = lowestWalker.currentPosition / CHUNK_SIZE; + + while (walker.hasNext) { + String currentSpecNotation = walker.currentSpecNotation(); + currentContig[0] = walker.currentContig; + currentPosition[0] = walker.currentPosition; + currentRef[0] = walker.currentRef; + currentAlt[0] = walker.currentAlt; + currentChunk = walker.currentPosition / CHUNK_SIZE; positionsProcessedInChunk.add(currentPosition[0]); if (lastContigProcessed == null) { - lastContigProcessed = lowestWalker.currentContig; + lastContigProcessed = walker.currentContig; } flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], false, - lowestWalker.currentLine); - lastContigProcessed = lowestWalker.currentContig; + walker.currentLine); + lastContigProcessed = walker.currentContig; lastChunkProcessed = currentChunk; char[][][] maskStringsForVariantSpec = { zygosityMaskStrings.get(currentSpecNotation) }; @@ -147,21 +142,18 @@ private void loadSingleContig() throws IOException { } } } - walkers.stream().filter((walker) -> { - return walker.currentPosition == currentPosition[0] && walker.currentAlt == currentAlt[0] - && walker.currentRef == currentRef[0] && walker.currentContig.contentEquals(currentContig[0]); - }).forEach((walker) -> { + + while (walker.currentPosition == currentPosition[0] && walker.currentAlt == currentAlt[0] + && Objects.equals(walker.currentRef, currentRef[0]) && walker.currentContig.contentEquals(currentContig[0]) && walker.hasNext) { walker.updateRecords(maskStringsForVariantSpec[0], infoStoreMap); try { walker.nextLine(); } catch (IOException e) { throw new UncheckedIOException(e); } - }); + } + zygosityMaskStrings.put(currentSpecNotation, maskStringsForVariantSpec[0]); - walkers = walkers.parallelStream().filter((walker) -> { - return walker.hasNext; - }).collect(Collectors.toList()); } flipChunk(lastContigProcessed, lastChunkProcessed, currentChunk, currentContig[0], true, null); From 7cd7900b01fc542b55413444e6ad7da3d4c38430 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 3 Mar 2025 11:11:37 -0500 Subject: [PATCH 09/25] Update build references for new vcf loader --- docker/pic-sure-hpds-etl/Dockerfile | 1 + etl/pom.xml | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/docker/pic-sure-hpds-etl/Dockerfile b/docker/pic-sure-hpds-etl/Dockerfile index f160b120..203a0f40 100644 --- a/docker/pic-sure-hpds-etl/Dockerfile +++ b/docker/pic-sure-hpds-etl/Dockerfile @@ -21,6 +21,7 @@ COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoader-jar-with-dependencies. COPY --from=build /app/docker/pic-sure-hpds-etl/CSVLoaderNewSearch-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/CSVDumper-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/VCFLocalLoader-jar-with-dependencies.jar . +COPY --from=build /app/docker/pic-sure-hpds-etl/SplitChromosomeVcfLoader-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/VariantMetadataLoader-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/UnifiedVCFLocalLoader-jar-with-dependencies.jar . COPY --from=build /app/docker/pic-sure-hpds-etl/MultialleleCounter-jar-with-dependencies.jar . diff --git a/etl/pom.xml b/etl/pom.xml index c218d06a..6095ce80 100644 --- a/etl/pom.xml +++ b/etl/pom.xml @@ -110,6 +110,26 @@ single + + buildSplitChromosomeVcfLoader + + + + edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVcfLoader + + + ${project.basedir}/../docker/pic-sure-hpds-etl + + jar-with-dependencies + + SplitChromosomeVcfLoader + SplitChromosomeVcfLoader + + package + + single + + buildPerPatientUnifiedVCFLocalLoader From da65d98ea578d2fe7cb288479d3d657449c4108e Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 4 Mar 2025 08:52:53 -0500 Subject: [PATCH 10/25] Fix typo --- .../genotype/SplitChromosomeVcfLoader.java | 2 +- .../hpds/etl/genotype/VCFIndexBuilder.java | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java index 1949653c..6b87348b 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -47,7 +47,7 @@ public static void main(String[] args) throws IOException { } else { logger.info(args.length + " arguments provided"); logger.info("Using default values"); - vcfLoader = new NewVCFLoader(); + vcfLoader = new SplitChromosomeVcfLoader(); } vcfLoader.loadAndMerge(); diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java index 9dbda27a..668cae91 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java @@ -99,22 +99,23 @@ private void writeVcfIndexes() { writeVcfIndex(vcfGroup, groupToVcfMapping.get(vcfGroup)); }); System.out.println(groupToVcfMapping.size()); - - } private void writeVcfIndex(String vcfGroup, List vcfFiles) { try { FileWriter fileWriter = new FileWriter(vcfIndexOutputDirectory.getAbsolutePath() + "/" + vcfGroup + "-vcfIndex.tsv"); fileWriter.write("\"vcf_path\"\t\"chromosome\"\t\"isAnnotated\"\t\"isGzipped\"\t\"sample_ids\"\t\"patient_ids\"\t\"sample_relationship\"\t\"related_sample_ids\"\n"); - String vcfFile = vcfFiles.get(0); - Set validPatientUUIDs = fileToPatientListMap.get(vcfFile) - .stream() - .filter(patientUUIDToPatientIdMapping::containsKey) - .collect(Collectors.toSet()); - List patentIds = validPatientUUIDs.stream().map(patientUUIDToPatientIdMapping::get).filter(Objects::nonNull).toList(); - fileWriter.write("\"" + VCF_INDEX_DIRECTORY + "/" + vcfFile + "\"\t\"" + extractChromosome(vcfFile) + "\"\t\"1\"\t\"1\"\t"); - fileWriter.write("\"" + COMMA_JOINER.join(validPatientUUIDs) + "\"\t\"" + COMMA_JOINER.join(patentIds) + "\""); + + for (String vcfFile : vcfFiles) { + Set validPatientUUIDs = fileToPatientListMap.get(vcfFile) + .stream() + .filter(patientUUIDToPatientIdMapping::containsKey) + .collect(Collectors.toSet()); + List patentIds = validPatientUUIDs.stream().map(patientUUIDToPatientIdMapping::get).filter(Objects::nonNull).toList(); + fileWriter.write("\"" + VCF_INDEX_DIRECTORY + "/" + vcfFile + "\"\t\"" + extractChromosome(vcfFile) + "\"\t\"1\"\t\"1\"\t"); + fileWriter.write("\"" + COMMA_JOINER.join(validPatientUUIDs) + "\"\t\"" + COMMA_JOINER.join(patentIds) + "\"\n"); + } + fileWriter.flush(); fileWriter.close(); } catch (IOException e) { From 73f4c639354e68babf46026ca127dc6cbdcfb2c9 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 4 Mar 2025 10:16:26 -0500 Subject: [PATCH 11/25] Update genomic config for bch --- service/src/main/resources/application-bch-dev.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/src/main/resources/application-bch-dev.properties b/service/src/main/resources/application-bch-dev.properties index 9a3bedcd..512f3b16 100644 --- a/service/src/main/resources/application-bch-dev.properties +++ b/service/src/main/resources/application-bch-dev.properties @@ -3,5 +3,5 @@ SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 VCF_EXCERPT_ENABLED=true -hpds.genomicProcessor.impl=localPatientOnlyDistributed +hpds.genomicProcessor.impl=localPatientDistributedGenomicProcessor HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file From b91771f539b1a61d2e65528e4fc6edb9b6cbf712 Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Tue, 4 Mar 2025 10:29:31 -0500 Subject: [PATCH 12/25] toggle filesharing --- .../harvard/hms/dbmi/avillach/hpds/service/PicSureService.java | 2 +- service/src/main/resources/application.properties | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index dd150d7c..ab76fbdd 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -264,7 +264,7 @@ public ResponseEntity writeQueryResult( .status(400) .body("The query pic-sure ID is not a UUID"); } - if (query.getExpectedResultType() != ResultType.DATAFRAME_TIMESERIES) { + if (!List.of(ResultType.DATAFRAME_TIMESERIES, ResultType.PATIENTS).contains(query.getExpectedResultType())) { return ResponseEntity .status(400) .body("The write endpoint only writes time series dataframes. Fix result type."); diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index d2dc8be4..87ebec84 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -1,3 +1,4 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 -LARGE_TASK_THREADS = 1 \ No newline at end of file +LARGE_TASK_THREADS = 1 +enable_file_sharing=true \ No newline at end of file From 6a0cbfd4fdc2c48180e18145dc79f171da17312c Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 4 Mar 2025 13:28:25 -0500 Subject: [PATCH 13/25] Fix spring config --- service/src/main/resources/application-bch-dev.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/src/main/resources/application-bch-dev.properties b/service/src/main/resources/application-bch-dev.properties index 512f3b16..7a68ec35 100644 --- a/service/src/main/resources/application-bch-dev.properties +++ b/service/src/main/resources/application-bch-dev.properties @@ -3,5 +3,5 @@ SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 VCF_EXCERPT_ENABLED=true -hpds.genomicProcessor.impl=localPatientDistributedGenomicProcessor +hpds.genomicProcessor.impl=localPatientDistributed HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file From 21ef8ae358448ed0f859742a5b4c82607730da47 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 4 Mar 2025 13:43:29 -0500 Subject: [PATCH 14/25] Fix tests --- .../main/resources/application-integration-test.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/src/main/resources/application-integration-test.properties b/service/src/main/resources/application-integration-test.properties index 006af76d..5234a5e3 100644 --- a/service/src/main/resources/application-integration-test.properties +++ b/service/src/main/resources/application-integration-test.properties @@ -6,4 +6,6 @@ VCF_EXCERPT_ENABLED=true hpds.genomicProcessor.impl=localDistributed HPDS_GENOMIC_DATA_DIRECTORY=target/all/ -HPDS_DATA_DIRECTORY=target/test-classes/phenotypic/ \ No newline at end of file +HPDS_DATA_DIRECTORY=target/test-classes/phenotypic/ + +enable_file_sharing=false \ No newline at end of file From f625ea5396395ef5d7defc00dd81492886ddab8a Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 10 Mar 2025 09:19:31 -0400 Subject: [PATCH 15/25] Update variant metadata loader implementaton --- etl/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/pom.xml b/etl/pom.xml index 6095ce80..ae0b3315 100644 --- a/etl/pom.xml +++ b/etl/pom.xml @@ -275,7 +275,7 @@ - edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.VariantMetadataLoader + edu.harvard.hms.dbmi.avillach.hpds.etl.genotype.SplitChromosomeVariantMetadataLoader ${project.basedir}/../docker/pic-sure-hpds-etl From 154002fa2694d249c8d8f9c11c801a21be70c1aa Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 13 Mar 2025 15:02:40 -0400 Subject: [PATCH 16/25] Attempt to fix variant explorer issue --- ...GenomicProcessorPatientMergingParentImpl.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java index 42d65570..02c4f3e2 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java @@ -133,8 +133,20 @@ private List initializePatientIds() { @Override public Optional getMasks(String path, VariantBucketHolder variantMasksVariantBucketHolder) { - // TODO: implement this. only used in variant explorer - throw new RuntimeException("Method not implemented"); + VariableVariantMasks aggregatedMasks = null; + int size = 0; + for (GenomicProcessor node : nodes) { + VariableVariantMasks masks = node.getMasks(path, variantMasksVariantBucketHolder).orElseGet(VariableVariantMasks::new); + int nodeSize = node.getPatientIds().size(); + if (aggregatedMasks == null) { + aggregatedMasks = masks; + size = nodeSize; + } else { + aggregatedMasks = VariableVariantMasks.append(aggregatedMasks, size, masks, nodeSize); + size = size + nodeSize; + } + } + return Optional.of(aggregatedMasks); } @Override From 2ae548be2ec5a649b793b90d26644d17ca7955c2 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 13 Mar 2025 15:58:53 -0400 Subject: [PATCH 17/25] Attempt to fix variant explorer issue --- .../processing/GenomicProcessorPatientMergingParentImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java index 02c4f3e2..c05db882 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImpl.java @@ -136,7 +136,7 @@ public Optional getMasks(String path, VariantBucketHolder< VariableVariantMasks aggregatedMasks = null; int size = 0; for (GenomicProcessor node : nodes) { - VariableVariantMasks masks = node.getMasks(path, variantMasksVariantBucketHolder).orElseGet(VariableVariantMasks::new); + VariableVariantMasks masks = node.getMasks(path, new VariantBucketHolder<>()).orElseGet(VariableVariantMasks::new); int nodeSize = node.getPatientIds().size(); if (aggregatedMasks == null) { aggregatedMasks = masks; From d4ad50e1e4a6a755dcc5f7dd7e34008211dbc815 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 14 Mar 2025 13:09:40 -0400 Subject: [PATCH 18/25] Test coverage for variant explorer fix --- .../hpds/processing/VariantListProcessor.java | 1 - ...ProcessorPatientMergingParentImplTest.java | 146 ++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java index 7310e6f9..d4200d07 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/VariantListProcessor.java @@ -265,7 +265,6 @@ public String runVcfExcerptQuery(Query query, boolean includePatientData) throws } } - // todo: deal with empty return VariableVariantMasks masks = abstractProcessor.getMasks(variantSpec, variantMaskBucketHolder).get(); //make strings of 000100 so we can just check 'char at' diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java index 9f27111e..5901adc5 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java @@ -1,8 +1,10 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariableVariantMasks; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMask; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMaskBitmaskImpl; import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMaskSparseImpl; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -14,9 +16,12 @@ import java.math.BigInteger; import java.util.List; +import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith({MockitoExtension.class, OutputCaptureExtension.class}) @@ -145,4 +150,145 @@ public void createMaskForPatientSet_validResponsesOneEmpty_returnMerged() { VariantMask expectedPatientMask = new VariantMaskBitmaskImpl(new BigInteger("11000100000000011011", 2)); assertEquals(expectedPatientMask, patientMask); } + + + @Test + public void getMasks_validEmptyResponses_returnEmpty() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + when(mockProcessor2.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + when(mockProcessor3.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + + Optional masks = patientMergingParent.getMasks(path, new VariantBucketHolder<>()); + assertEquals(Optional.of(new VariableVariantMasks()), masks); + } + + @Test + public void getMasks_validEmptyAndNullResponses_returnEmpty() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + when(mockProcessor2.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(new VariableVariantMasks())); + when(mockProcessor3.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + + Optional masks = patientMergingParent.getMasks(path, new VariantBucketHolder<>()); + assertEquals(Optional.of(new VariableVariantMasks()), masks); + } + + + @Test + public void getMasks_validResponses_returnMerged() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + VariableVariantMasks variableVariantMasks1 = new VariableVariantMasks(); + variableVariantMasks1.heterozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11011011", 2)); + VariableVariantMasks variableVariantMasks2 = new VariableVariantMasks(); + variableVariantMasks2.heterozygousMask = new VariantMaskSparseImpl(Set.of(3, 4)); + VariableVariantMasks variableVariantMasks3 = new VariableVariantMasks(); + variableVariantMasks3.heterozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11000111", 2)); + + when(mockProcessor1.getPatientIds()).thenReturn(List.of("1", "2", "3", "4")); + when(mockProcessor2.getPatientIds()).thenReturn(List.of("5", "6", "7", "8", "9", "10", "11", "12")); + when(mockProcessor3.getPatientIds()).thenReturn(List.of("15", "16", "17", "18")); + + + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks1)); + when(mockProcessor2.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks2)); + when(mockProcessor3.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks3)); + + Optional masks = patientMergingParent.getMasks(path, new VariantBucketHolder<>()); + + VariantMask expectedPatientMask = new VariantMaskBitmaskImpl(new BigInteger("11000100011000011011", 2)); + assertEquals(expectedPatientMask, masks.get().heterozygousMask); + } + @Test + public void getMasks_validResponsesSinglePartition_returnResult() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + VariableVariantMasks variableVariantMasks1 = new VariableVariantMasks(); + variableVariantMasks1.heterozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11011011", 2)); + + when(mockProcessor1.getPatientIds()).thenReturn(List.of("1", "2", "3", "4")); + + + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks1)); + + Optional masks = new GenomicProcessorPatientMergingParentImpl(List.of(mockProcessor1)).getMasks(path, new VariantBucketHolder<>()); + + assertEquals(variableVariantMasks1.heterozygousMask, masks.get().heterozygousMask); + } + + @Test + public void getMasks_validResponsesSinglePartitionEmpty_returnEmpty() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + + when(mockProcessor1.getPatientIds()).thenReturn(List.of("1", "2", "3", "4")); + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + + Optional masks = new GenomicProcessorPatientMergingParentImpl(List.of(mockProcessor1)).getMasks(path, new VariantBucketHolder<>()); + + assertNull(masks.get().heterozygousMask); + } + + @Test + public void getMasks_validAndEmptyResponses_returnMerged() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + VariableVariantMasks variableVariantMasks1 = new VariableVariantMasks(); + variableVariantMasks1.heterozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11011011", 2)); + VariableVariantMasks variableVariantMasks3 = new VariableVariantMasks(); + variableVariantMasks3.heterozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11000111", 2)); + + when(mockProcessor1.getPatientIds()).thenReturn(List.of("1", "2", "3", "4")); + when(mockProcessor2.getPatientIds()).thenReturn(List.of("5", "6", "7", "8", "9", "10", "11", "12")); + when(mockProcessor3.getPatientIds()).thenReturn(List.of("15", "16", "17", "18")); + + + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks1)); + when(mockProcessor2.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.empty()); + when(mockProcessor3.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks3)); + + Optional masks = patientMergingParent.getMasks(path, new VariantBucketHolder<>()); + + VariantMask expectedPatientMask = new VariantMaskBitmaskImpl(new BigInteger("11000100000000011011", 2)); + assertEquals(expectedPatientMask, masks.get().heterozygousMask); + } + + @Test + public void getMasks_validResponsesHomozygous_returnMerged() { + String path = "chr21,5032061,A,Z,LOC102723996,missense_variant"; + VariableVariantMasks variableVariantMasks1 = new VariableVariantMasks(); + variableVariantMasks1.homozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11011011", 2)); + VariableVariantMasks variableVariantMasks2 = new VariableVariantMasks(); + variableVariantMasks2.homozygousMask = new VariantMaskSparseImpl(Set.of(3, 4)); + VariableVariantMasks variableVariantMasks3 = new VariableVariantMasks(); + variableVariantMasks3.homozygousMask = new VariantMaskBitmaskImpl(new BigInteger("11000111", 2)); + + when(mockProcessor1.getPatientIds()).thenReturn(List.of("1", "2", "3", "4")); + when(mockProcessor2.getPatientIds()).thenReturn(List.of("5", "6", "7", "8", "9", "10", "11", "12")); + when(mockProcessor3.getPatientIds()).thenReturn(List.of("15", "16", "17", "18")); + + + when(mockProcessor1.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks1)); + when(mockProcessor2.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks2)); + when(mockProcessor3.getMasks(eq(path), any(VariantBucketHolder.class))) + .thenReturn(Optional.of(variableVariantMasks3)); + + Optional masks = patientMergingParent.getMasks(path, new VariantBucketHolder<>()); + + VariantMask expectedPatientMask = new VariantMaskBitmaskImpl(new BigInteger("11000100011000011011", 2)); + assertEquals(expectedPatientMask, masks.get().homozygousMask); + } } \ No newline at end of file From 5988056ed4d5bed53575bf09f6d1fecfd9e09879 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 14 Mar 2025 13:25:52 -0400 Subject: [PATCH 19/25] Test coverage for variant explorer fix --- .../GenomicProcessorPatientMergingParentImplTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java index 5901adc5..70b38298 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/GenomicProcessorPatientMergingParentImplTest.java @@ -290,5 +290,8 @@ public void getMasks_validResponsesHomozygous_returnMerged() { VariantMask expectedPatientMask = new VariantMaskBitmaskImpl(new BigInteger("11000100011000011011", 2)); assertEquals(expectedPatientMask, masks.get().homozygousMask); + assertNull(masks.get().heterozygousMask); + assertNull(masks.get().heterozygousNoCallMask); + assertNull(masks.get().homozygousNoCallMask); } } \ No newline at end of file From 4caf5e96206668f991547796ca4ed0c0eb714868 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 14 Mar 2025 14:50:02 -0400 Subject: [PATCH 20/25] Changesfrom PR --- .../avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java | 2 +- .../hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java index 6b87348b..2285d916 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -143,7 +143,7 @@ private void loadSingleContig(VCFWalker walker) throws IOException { } } - while (walker.currentPosition == currentPosition[0] && walker.currentAlt == currentAlt[0] + while (walker.currentPosition == currentPosition[0] && Objects.equals(walker.currentAlt, currentAlt[0]) && Objects.equals(walker.currentRef, currentRef[0]) && walker.currentContig.contentEquals(currentContig[0]) && walker.hasNext) { walker.updateRecords(maskStringsForVariantSpec[0], infoStoreMap); try { diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java index 668cae91..9b14d049 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/VCFIndexBuilder.java @@ -86,7 +86,6 @@ private void writeVcfIndexes() { Map> groupToVcfMapping = new HashMap<>(); for (String fileName : fileToPatientListMap.keySet()) { - String chromosome = extractChromosome(fileName); String baseFile = fileName.substring(0, fileName.indexOf(".chr")); List vcfFiles = groupToVcfMapping.getOrDefault(baseFile, new ArrayList<>()); vcfFiles.add(fileName); @@ -98,7 +97,6 @@ private void writeVcfIndexes() { .forEach(vcfGroup -> { writeVcfIndex(vcfGroup, groupToVcfMapping.get(vcfGroup)); }); - System.out.println(groupToVcfMapping.size()); } private void writeVcfIndex(String vcfGroup, List vcfFiles) { From 83c388887812b08703d86817a56c6bb649c63d07 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 17 Mar 2025 11:47:02 -0400 Subject: [PATCH 21/25] Fix string == bug that was actually working --- .../avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java index 2285d916..8f3196c5 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/genotype/SplitChromosomeVcfLoader.java @@ -109,6 +109,7 @@ private void loadSingleContig(VCFWalker walker) throws IOException { int[] currentPosition = { -1 }; String[] currentRef = new String[1]; String[] currentAlt = new String[1]; + String[] currentVariantSpec = new String[1]; zygosityMaskStrings = new HashMap(); @@ -120,6 +121,7 @@ private void loadSingleContig(VCFWalker walker) throws IOException { currentPosition[0] = walker.currentPosition; currentRef[0] = walker.currentRef; currentAlt[0] = walker.currentAlt; + currentVariantSpec[0] = walker.currentSpecNotation(); currentChunk = walker.currentPosition / CHUNK_SIZE; positionsProcessedInChunk.add(currentPosition[0]); @@ -143,8 +145,7 @@ private void loadSingleContig(VCFWalker walker) throws IOException { } } - while (walker.currentPosition == currentPosition[0] && Objects.equals(walker.currentAlt, currentAlt[0]) - && Objects.equals(walker.currentRef, currentRef[0]) && walker.currentContig.contentEquals(currentContig[0]) && walker.hasNext) { + while (Objects.equals(walker.currentSpecNotation(), currentVariantSpec[0]) && walker.hasNext) { walker.updateRecords(maskStringsForVariantSpec[0], infoStoreMap); try { walker.nextLine(); From b056061499daf3b245e6d3cb1104c8f7c61dd457 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 18 Mar 2025 09:39:29 -0400 Subject: [PATCH 22/25] Fix integration tests --- service/src/main/resources/application.properties | 2 +- .../hms/dbmi/avillach/hpds/service/PicSureServiceTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index 87ebec84..a4d9352a 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -1,4 +1,4 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 -enable_file_sharing=true \ No newline at end of file +enable_file_sharing=false \ No newline at end of file diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureServiceTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureServiceTest.java index 4bfefe5e..cc5f34d4 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureServiceTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureServiceTest.java @@ -80,7 +80,7 @@ void should400ForBadResultType() { Mockito.when(query.getPicSureId()).thenReturn(UUID.randomUUID().toString()); ResponseEntity actual = subject.writeQueryResult(query, "patients"); Assertions.assertEquals(HttpStatus.BAD_REQUEST, actual.getStatusCode()); - Assertions.assertEquals("The write endpoint only writes time series dataframes and patients. Fix result type.", actual.getBody()); + Assertions.assertEquals("The write endpoint only writes time series dataframes. Fix result type.", actual.getBody()); } } From a2d1e526fd3a1ab4df79b4a9d6e4b8fde16c47d7 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 20 Mar 2025 14:38:16 -0400 Subject: [PATCH 23/25] Add gic institute spring profile --- .../main/resources/application-gic-institute.properties | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 service/src/main/resources/application-gic-institute.properties diff --git a/service/src/main/resources/application-gic-institute.properties b/service/src/main/resources/application-gic-institute.properties new file mode 100644 index 00000000..7a68ec35 --- /dev/null +++ b/service/src/main/resources/application-gic-institute.properties @@ -0,0 +1,7 @@ +SMALL_JOB_LIMIT = 100 +SMALL_TASK_THREADS = 1 +LARGE_TASK_THREADS = 1 +VCF_EXCERPT_ENABLED=true + +hpds.genomicProcessor.impl=localPatientDistributed +HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file From 1c36a64217a63382620984842cebb0b6b83e9a82 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 20 Mar 2025 14:48:41 -0400 Subject: [PATCH 24/25] Add gic institute spring profile --- ...n-gic-institute.properties => application-gic-site.properties} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename service/src/main/resources/{application-gic-institute.properties => application-gic-site.properties} (100%) diff --git a/service/src/main/resources/application-gic-institute.properties b/service/src/main/resources/application-gic-site.properties similarity index 100% rename from service/src/main/resources/application-gic-institute.properties rename to service/src/main/resources/application-gic-site.properties From 1afff4ba1e56c94e255de44886fbeaa8b733c0c0 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Fri, 21 Mar 2025 09:04:28 -0400 Subject: [PATCH 25/25] ALS-7737: Set gic site spring param --- service/src/main/resources/application-gic-site.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/src/main/resources/application-gic-site.properties b/service/src/main/resources/application-gic-site.properties index 7a68ec35..c00b1a01 100644 --- a/service/src/main/resources/application-gic-site.properties +++ b/service/src/main/resources/application-gic-site.properties @@ -4,4 +4,6 @@ LARGE_TASK_THREADS = 1 VCF_EXCERPT_ENABLED=true hpds.genomicProcessor.impl=localPatientDistributed -HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file +HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ + +enable_file_sharing=true \ No newline at end of file