Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MultiPlatform Process abstractions #12725

Merged
merged 1 commit into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/python/pants/backend/python/util_rules/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@
PathGlobs,
)
from pants.engine.platform import Platform
from pants.engine.process import (
BashBinary,
MultiPlatformProcess,
Process,
ProcessCacheScope,
ProcessResult,
)
from pants.engine.process import BashBinary, Process, ProcessCacheScope, ProcessResult
from pants.engine.rules import Get, collect_rules, rule
from pants.util.docutil import doc_url
from pants.util.frozendict import FrozenDict
Expand Down Expand Up @@ -544,10 +538,12 @@ async def build_pex(
),
)

process = dataclasses.replace(process, platform=platform)

# NB: Building a Pex is platform dependent, so in order to get a PEX that we can use locally
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This comment should probably be on the dataclasses.replace call.

# without cross-building, we specify that our PEX command should be run on the current local
# platform.
result = await Get(ProcessResult, MultiPlatformProcess({platform: process}))
result = await Get(ProcessResult, Process, process)

if pex_runtime_env.verbosity > 0:
log_output = result.stderr.decode()
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def test_process_digests_on_streaming_workunits(
assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
process_workunit = next(item for item in finished if item["name"] == "process")
assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
stderr_digest = process_workunit["artifacts"]["stderr_digest"]
Expand Down Expand Up @@ -820,7 +820,7 @@ def test_process_digests_on_streaming_workunits(

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
process_workunit = next(item for item in finished if item["name"] == "process")

assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
FallibleProcessResult,
InteractiveProcess,
InteractiveProcessResult,
MultiPlatformProcess,
Process,
ProcessResultMetadata,
)
from pants.engine.rules import Rule, RuleIndex, TaskRule
Expand Down Expand Up @@ -158,7 +158,7 @@ def __init__(
digest_subset=DigestSubset,
download_file=DownloadFile,
platform=Platform,
multi_platform_process=MultiPlatformProcess,
process=Process,
process_result=FallibleProcessResult,
process_result_metadata=ProcessResultMetadata,
coroutine=CoroutineType,
Expand Down
39 changes: 5 additions & 34 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Process:
use_nailgun: Digest
execution_slot_variable: str | None
cache_scope: ProcessCacheScope
platform: str | None

def __init__(
self,
Expand All @@ -84,6 +85,7 @@ def __init__(
use_nailgun: Digest = EMPTY_DIGEST,
execution_slot_variable: str | None = None,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
platform: Platform | None = None,
) -> None:
"""Request to run a subprocess, similar to subprocess.Popen.

Expand Down Expand Up @@ -128,32 +130,7 @@ def __init__(
self.use_nailgun = use_nailgun
self.execution_slot_variable = execution_slot_variable
self.cache_scope = cache_scope


@frozen_after_init
@dataclass(unsafe_hash=True)
class MultiPlatformProcess:
platform_constraints: tuple[str | None, ...]
processes: tuple[Process, ...]

def __init__(self, request_dict: dict[Platform | None, Process]) -> None:
if len(request_dict) == 0:
raise ValueError("At least one platform-constrained Process must be passed.")
serialized_constraints = tuple(
constraint.value if constraint else None for constraint in request_dict
)
if len([req.description for req in request_dict.values()]) != 1:
raise ValueError(
f"The `description` of all processes in a {MultiPlatformProcess.__name__} must "
f"be identical, but got: {list(request_dict.values())}."
)

self.platform_constraints = serialized_constraints
self.processes = tuple(request_dict.values())

@property
def product_description(self) -> ProductDescription:
return ProductDescription(self.processes[0].description)
self.platform = platform.value if platform is not None else None


@dataclass(frozen=True)
Expand Down Expand Up @@ -259,14 +236,8 @@ def try_decode(content: bytes) -> str:


@rule
def get_multi_platform_request_description(req: MultiPlatformProcess) -> ProductDescription:
return req.product_description


@rule
def upcast_process(req: Process) -> MultiPlatformProcess:
"""This rule allows an Process to be run as a platform compatible MultiPlatformProcess."""
return MultiPlatformProcess({None: req})
def get_multi_platform_request_description(req: Process) -> ProductDescription:
return ProductDescription(req.description)


@rule
Expand Down
19 changes: 6 additions & 13 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use workunit_store::{
};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata, ProcessResultSource,
Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope,
ProcessMetadata, ProcessResultSource,
};

// TODO: Consider moving into protobuf as a CacheValue type.
Expand Down Expand Up @@ -53,23 +53,16 @@ impl CommandRunner {

#[async_trait]
impl crate::CommandRunner for CommandRunner {
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
self.underlying.extract_compatible_request(req)
}

async fn run(
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let cache_lookup_start = Instant::now();
let write_failures_to_cache = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);
let write_failures_to_cache = req.cache_scope == ProcessCacheScope::Always;
let key = CacheKey {
digest: Some(crate::digest(req.clone(), &self.metadata).into()),
digest: Some(crate::digest(&req, &self.metadata).into()),
key_type: CacheKeyType::Process.into(),
};

Expand All @@ -80,7 +73,7 @@ impl crate::CommandRunner for CommandRunner {
"local_cache_read".to_owned(),
WorkunitMetadata {
level: Level::Trace,
desc: Some(format!("Local cache lookup: {}", req.user_facing_name())),
desc: Some(format!("Local cache lookup: {}", req.description)),
..WorkunitMetadata::default()
},
|workunit| async move {
Expand Down
106 changes: 15 additions & 91 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
extern crate derivative;

use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::sync::Arc;

use async_semaphore::AsyncSemaphore;
use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;
use hashing::{Digest, EMPTY_FINGERPRINT};
use hashing::Digest;
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
Expand Down Expand Up @@ -329,51 +329,6 @@ impl Process {
}
}

impl TryFrom<MultiPlatformProcess> for Process {
type Error = String;

fn try_from(req: MultiPlatformProcess) -> Result<Self, Self::Error> {
match req.0.get(&None) {
Some(crossplatform_req) => Ok(crossplatform_req.clone()),
None => Err(String::from(
"Cannot coerce to a simple Process, no cross platform request exists.",
)),
}
}
}

///
/// A container of platform constrained processes.
///
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct MultiPlatformProcess(pub BTreeMap<Option<Platform>, Process>);

impl MultiPlatformProcess {
pub fn user_facing_name(&self) -> String {
self
.0
.iter()
.next()
.map(|(_platforms, process)| process.description.clone())
.unwrap_or_else(|| "<Unnamed process>".to_string())
}

pub fn workunit_level(&self) -> log::Level {
self
.0
.iter()
.next()
.map(|(_platforms, process)| process.level)
.unwrap_or(Level::Info)
}
}

impl From<Process> for MultiPlatformProcess {
fn from(proc: Process) -> Self {
MultiPlatformProcess(vec![(None, proc)].into_iter().collect())
}
}

///
/// Metadata surrounding an Process which factors into its cache key when cached
/// externally from the engine graph (e.g. when using remote execution or an external process
Expand Down Expand Up @@ -559,40 +514,15 @@ pub trait CommandRunner: Send + Sync {
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String>;

///
/// Given a multi platform request which may have some platform
/// constraints determine if any of the requests contained within are compatible
/// with the current command runners platform configuration. If so return the
/// first candidate that will be run if the multi platform request is submitted to
/// `fn run(..)`
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process>;
}

// TODO(#8513) possibly move to the MEPR struct, or to the hashing crate?
pub fn digest(req: MultiPlatformProcess, metadata: &ProcessMetadata) -> Digest {
let mut hashes: Vec<String> = req
.0
.values()
.map(|process| crate::remote::make_execute_request(process, metadata.clone()).unwrap())
.map(|(_a, _b, er)| {
er.action_digest
.map(|d| d.hash)
.unwrap_or_else(|| EMPTY_FINGERPRINT.to_hex())
})
.collect();
hashes.sort();
Digest::of_bytes(
hashes
.iter()
.fold(String::new(), |mut acc, hash| {
acc.push_str(hash);
acc
})
.as_bytes(),
)
pub fn digest(process: &Process, metadata: &ProcessMetadata) -> Digest {
let (_, _, execute_request) =
crate::remote::make_execute_request(process, metadata.clone()).unwrap();
execute_request.action_digest.unwrap().try_into().unwrap()
}

///
Expand All @@ -617,7 +547,7 @@ impl CommandRunner for BoundedCommandRunner {
&self,
context: Context,
workunit: &mut RunningWorkunit,
mut req: MultiPlatformProcess,
mut process: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let semaphore_acquisition = self.inner.1.acquire();
let permit = in_workunit!(
Expand All @@ -636,24 +566,18 @@ impl CommandRunner for BoundedCommandRunner {

log::debug!(
"Running {} under semaphore with concurrency id: {}",
req.user_facing_name(),
process.description,
permit.concurrency_slot()
);

for (_, process) in req.0.iter_mut() {
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
process.env.insert(
execution_slot_env_var.clone(),
format!("{}", permit.concurrency_slot()),
);
}
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
process.env.insert(
execution_slot_env_var.clone(),
format!("{}", permit.concurrency_slot()),
);
}

self.inner.0.run(context, workunit, req).await
}

fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
self.inner.0.extract_compatible_request(req)
self.inner.0.run(context, workunit, process).await
}
}

Expand Down
14 changes: 2 additions & 12 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tryfuture::try_future;
use workunit_store::{in_workunit, Level, Metric, RunningWorkunit, WorkunitMetadata};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process,
ProcessResultMetadata, ProcessResultSource,
};

Expand Down Expand Up @@ -234,25 +234,15 @@ impl ChildResults {

#[async_trait]
impl super::CommandRunner for CommandRunner {
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
for compatible_constraint in vec![None, self.platform.into()].iter() {
if let Some(compatible_req) = req.0.get(compatible_constraint) {
return Some(compatible_req.clone());
}
}
None
}

///
/// Runs a command on this machine in the passed working directory.
///
async fn run(
&self,
context: Context,
_workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let req = self.extract_compatible_request(&req).unwrap();
let req_debug_repr = format!("{:#?}", req);
in_workunit!(
context.workunit_store.clone(),
Expand Down
Loading