Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework crawl page migration #2412

Merged
merged 70 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
05a7419
migration fixes:
ikreymer Feb 18, 2025
0ae4291
bump version to 1.14.0-beta.2
ikreymer Feb 18, 2025
feb4f09
bump to 1.14.0-beta.3
ikreymer Feb 18, 2025
16dd7a4
typo fix
ikreymer Feb 18, 2025
9ab4dac
empty filenames migration: use existing bulk re_add method for faster…
ikreymer Feb 18, 2025
20b6a73
bump to 1.14.0-beta.4
ikreymer Feb 18, 2025
b57e4fb
Add version to BaseCrawl, set to 2 for new crawls/uploads
tw4l Feb 18, 2025
e390ba3
Use optimize pages background job in 0042
tw4l Feb 18, 2025
4ea162f
Hacky workaroud: Pass default org to retry_background_job
tw4l Feb 18, 2025
b67e2c7
Make sure job failure emails are sent even if no oid
tw4l Feb 18, 2025
bd2e2c2
Fix typing error
tw4l Feb 18, 2025
eb455a1
Only include replay optimization fields if all pages are optimized
tw4l Feb 18, 2025
bef7a18
Set isMigrating at same time as pulling next crawl
tw4l Feb 18, 2025
4f90555
Redo background_jobs typing, add superuser non-org list endpoint
tw4l Feb 18, 2025
25c8244
Remove crawl_type from optimize_pages bg job
tw4l Feb 18, 2025
1891ef2
Handle running crawls in page optimization
tw4l Feb 18, 2025
b2d97f0
Remove isMigrating filter from running crawl check
tw4l Feb 18, 2025
ccd7f1e
Fix bg job retry
tw4l Feb 18, 2025
19d2547
Cast job to right type in retry
tw4l Feb 18, 2025
7bc3397
Add API endpoint to launch migrate crawls job
tw4l Feb 18, 2025
538d4a3
Use SuccessResponseId model to include job id in response
tw4l Feb 18, 2025
7d27feb
Add first draft of upgrade notes in docs
tw4l Feb 18, 2025
821c02d
cleanup:
ikreymer Feb 18, 2025
d311692
add logging to optimize pages job
ikreymer Feb 18, 2025
07d3c25
logging
ikreymer Feb 18, 2025
9fd26bf
check for empty result
ikreymer Feb 19, 2025
39ab851
version: bump to 1.14.0-beta.5
ikreymer Feb 19, 2025
f71a89e
fix response model
ikreymer Feb 19, 2025
533f78c
add retries for add_crawl_pages_to_db_from_wacz()
ikreymer Feb 19, 2025
25c6e3a
back to single background_job.yaml, just pass scale as param
ikreymer Feb 19, 2025
b4dcfd4
fix parallelism setting
ikreymer Feb 19, 2025
35b7883
retry at the wacz level, not crawl
ikreymer Feb 19, 2025
ed66a26
bg jobs: make single crawl readd a background job as well, if crawl_i…
ikreymer Feb 19, 2025
358bb51
add sort
ikreymer Feb 19, 2025
e9f070b
better call
ikreymer Feb 19, 2025
e94f8be
logging
ikreymer Feb 19, 2025
01431d7
ensure pages are streamed from each wacz on demand
ikreymer Feb 19, 2025
a486571
ignore dupes on insertMany
ikreymer Feb 19, 2025
d0c274e
attempt dedup within wacz
ikreymer Feb 19, 2025
5d949aa
ignore dupe errors?
ikreymer Feb 19, 2025
18301e3
update logging, add crawl_id + url index
ikreymer Feb 19, 2025
0936ccf
lint fix
ikreymer Feb 19, 2025
8742657
optimize pages: don't use $facet, don't compute actual total
ikreymer Feb 19, 2025
0c805f1
add index for default sort
ikreymer Feb 19, 2025
d379570
remove uniquge page counts from org metrics
ikreymer Feb 19, 2025
6ad27e1
update model
ikreymer Feb 19, 2025
b299941
remove resources from collection list
ikreymer Feb 19, 2025
06e193b
optimize coll /replay.json
ikreymer Feb 19, 2025
1dafa5b
optimize page snapshot query:
ikreymer Feb 19, 2025
c95022a
crawl /replay.json optimization:
ikreymer Feb 19, 2025
e3e3651
cleanup
ikreymer Feb 19, 2025
0efe822
fix typo
ikreymer Feb 19, 2025
4d6b21c
lint
ikreymer Feb 19, 2025
77caa40
Delay rendering components doing data fetching in collection settings…
emma-sg Feb 19, 2025
d6e76c4
optimize page snapshots, add non-group alternative
ikreymer Feb 19, 2025
85d4c6f
fix page_count update?
ikreymer Feb 20, 2025
bf6f24b
tweak page snapshot sort order
ikreymer Feb 20, 2025
f2ce698
use gte
ikreymer Feb 20, 2025
2861292
readd unquote
ikreymer Feb 20, 2025
c639e25
lint fix
ikreymer Feb 20, 2025
837b35c
tweak
ikreymer Feb 20, 2025
9e093b9
more cleanup:
ikreymer Feb 20, 2025
7042d16
test fix
ikreymer Feb 20, 2025
15a5332
test fix
ikreymer Feb 20, 2025
3731172
Update backend/btrixcloud/crawlmanager.py
ikreymer Feb 20, 2025
931e351
Pages Query: use $text search or prefix search (#2415)
ikreymer Feb 20, 2025
6e70662
Fix typo in response
tw4l Feb 20, 2025
fb9b65b
Add test asserts for crawl/upload version field
tw4l Feb 20, 2025
5713f88
remove dupe index
ikreymer Feb 20, 2025
967d2bd
set pages migration job to run at scale 1 by default
ikreymer Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 163 additions & 26 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
User,
SuccessResponse,
SuccessResponseId,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import dt_now
Expand Down Expand Up @@ -382,6 +384,7 @@ async def create_re_add_org_pages_job(
self,
oid: UUID,
crawl_type: Optional[str] = None,
crawl_id: Optional[str] = None,
existing_job_id: Optional[str] = None,
):
"""Create job to (re)add all pages in an org, optionally filtered by crawl type"""
Expand All @@ -390,6 +393,7 @@ async def create_re_add_org_pages_job(
job_id = await self.crawl_manager.run_re_add_org_pages_job(
oid=str(oid),
crawl_type=crawl_type,
crawl_id=crawl_id,
existing_job_id=existing_job_id,
)
if existing_job_id:
Expand All @@ -410,6 +414,7 @@ async def create_re_add_org_pages_job(
id=job_id,
oid=oid,
crawl_type=crawl_type,
crawl_id=crawl_id,
started=dt_now(),
)

Expand All @@ -424,18 +429,58 @@ async def create_re_add_org_pages_job(
print(f"warning: re-add org pages job could not be started: {exc}")
return None

async def create_optimize_crawl_pages_job(
self,
existing_job_id: Optional[str] = None,
):
"""Create job to optimize crawl pages"""

try:
job_id = await self.crawl_manager.run_optimize_pages_job(
existing_job_id=existing_job_id,
)
if existing_job_id:
optimize_pages_job = await self.get_background_job(existing_job_id)
previous_attempt = {
"started": optimize_pages_job.started,
"finished": optimize_pages_job.finished,
}
if optimize_pages_job.previousAttempts:
optimize_pages_job.previousAttempts.append(previous_attempt)
else:
optimize_pages_job.previousAttempts = [previous_attempt]
optimize_pages_job.started = dt_now()
optimize_pages_job.finished = None
optimize_pages_job.success = None
else:
optimize_pages_job = OptimizePagesJob(
id=job_id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": optimize_pages_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: optimize pages job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
job_type: str,
oid: UUID,
success: bool,
finished: datetime,
oid: Optional[UUID] = None,
) -> None:
"""Update job as finished, including
job-specific task handling"""

job = await self.get_background_job(job_id, oid)
job = await self.get_background_job(job_id)
if job.finished:
return

Expand All @@ -455,14 +500,16 @@ async def job_finished(
flush=True,
)
superuser = await self.user_manager.get_superuser()
org = await self.org_ops.get_org_by_id(job.oid)
org = None
if job.oid:
org = await self.org_ops.get_org_by_id(job.oid)
await asyncio.get_event_loop().run_in_executor(
None,
self.email.send_background_job_failed,
job,
org,
finished,
superuser.email,
org,
)

await self.jobs.find_one_and_update(
Expand All @@ -478,6 +525,7 @@ async def get_background_job(
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
Expand All @@ -504,11 +552,14 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.READD_ORG_PAGES:
return ReAddOrgPagesJob.from_dict(data)

if data["type"] == BgJobType.OPTIMIZE_PAGES:
return OptimizePagesJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
self,
org: Organization,
org: Optional[Organization] = None,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
Expand All @@ -522,7 +573,10 @@ async def list_background_jobs(
page = page - 1
skip = page_size * page

query: dict[str, object] = {"oid": org.id}
query: dict[str, object] = {}

if org:
query["oid"] = org.id

if success in (True, False):
query["success"] = success
Expand Down Expand Up @@ -590,10 +644,10 @@ async def get_replica_job_file(
raise HTTPException(status_code=404, detail="file_not_found")

async def retry_background_job(
self, job_id: str, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
self, job_id: str, org: Optional[Organization] = None
):
"""Retry background job"""
job = await self.get_background_job(job_id, org.id)
job = await self.get_background_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_not_found")

Expand All @@ -603,7 +657,23 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

if org:
return await self.retry_org_background_job(job, org)

if job.type == BgJobType.OPTIMIZE_PAGES:
await self.create_optimize_crawl_pages_job(
existing_job_id=job_id,
)
return {"success": True}

return {"success": False}

async def retry_org_background_job(
self, job: BackgroundJob, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job specific to one org"""
if job.type == BgJobType.CREATE_REPLICA:
job = cast(CreateReplicaJob, job)
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
Expand All @@ -618,10 +688,12 @@ async def retry_background_job(
job.replica_storage,
primary_file_path,
primary_endpoint,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.DELETE_REPLICA:
job = cast(DeleteReplicaJob, job)
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
Expand All @@ -630,31 +702,39 @@ async def retry_background_job(
job.object_type,
job.replica_storage,
force_start_immediately=True,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.DELETE_ORG:
job = cast(DeleteOrgJob, job)
await self.create_delete_org_job(
org,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.RECALCULATE_ORG_STATS:
job = cast(RecalculateOrgStatsJob, job)
await self.create_recalculate_org_stats_job(
org,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.READD_ORG_PAGES:
job = cast(ReAddOrgPagesJob, job)
await self.create_re_add_org_pages_job(
org.id,
job.crawl_type,
existing_job_id=job_id,
job.crawl_id,
existing_job_id=job.id,
)
return {"success": True}

return {"success": True}
return {"success": False}

async def retry_failed_background_jobs(
async def retry_failed_org_background_jobs(
self, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry all failed background jobs in an org
Expand All @@ -679,7 +759,9 @@ async def retry_all_failed_background_jobs(
"""
bg_tasks = set()
async for job in self.jobs.find({"success": False}):
org = await self.org_ops.get_org_by_id(job["oid"])
org = None
if job.get("oid"):
org = await self.org_ops.get_org_by_id(job["oid"])
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
Expand Down Expand Up @@ -707,23 +789,51 @@ def init_background_jobs_api(
"/{job_id}",
response_model=AnyJob,
)
async def get_background_job(
async def get_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.get_background_job(job_id)

@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
@app.post(
"/orgs/all/jobs/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"]
)
async def retry_background_job_no_org(job_id: str, user: User = Depends(user_dep)):
"""Retry backgound job that doesn't belong to an org, e.g. migration job"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

job = await ops.get_background_job(job_id)

org = None
if job.oid:
org = await ops.org_ops.get_org_by_id(job.oid)

return await ops.retry_background_job(job_id, org)

@app.post(
"/orgs/all/jobs/migrateCrawls", response_model=SuccessResponseId, tags=["jobs"]
)
async def create_migrate_crawls_job(job_id: str, user: User = Depends(user_dep)):
"""Launch background job to migrate all crawls to v2 with optimized pages"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

job_id = await ops.create_optimize_crawl_pages_job()

return {"sucess": True, "id": job_id}

@router.post("/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"])
async def retry_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
Expand All @@ -740,14 +850,41 @@ async def retry_all_failed_background_jobs(user: User = Depends(user_dep)):

return await ops.retry_all_failed_background_jobs()

@router.post("/retryFailed", response_model=SuccessResponse)
async def retry_failed_background_jobs(
@router.post("/retryFailed", response_model=SuccessResponse, tags=["jobs"])
async def retry_failed_org_background_jobs(
org: Organization = Depends(org_crawl_dep),
):
"""Retry failed background jobs"""
return await ops.retry_failed_background_jobs(org)
return await ops.retry_failed_org_background_jobs(org)

@app.get(
"/orgs/all/jobs", response_model=PaginatedBackgroundJobResponse, tags=["jobs"]
)
async def list_all_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
user: User = Depends(user_dep),
):
"""Retrieve paginated list of background jobs"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

jobs, total = await ops.list_background_jobs(
org=None,
page_size=pageSize,
page=page,
success=success,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
)
return paginated_format(jobs, total, page, pageSize)

@router.get("", response_model=PaginatedBackgroundJobResponse)
@router.get("", response_model=PaginatedBackgroundJobResponse, tags=["jobs"])
async def list_background_jobs(
org: Organization = Depends(org_crawl_dep),
pageSize: int = DEFAULT_PAGE_SIZE,
Expand All @@ -759,7 +896,7 @@ async def list_background_jobs(
):
"""Retrieve paginated list of background jobs"""
jobs, total = await ops.list_background_jobs(
org,
org=org,
page_size=pageSize,
page=page,
success=success,
Expand Down
17 changes: 9 additions & 8 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,17 @@ async def get_crawl_out(
if coll_ids:
res["collections"] = await self.colls.get_collection_names(coll_ids)

res["initialPages"], _ = await self.page_ops.list_pages(
crawlid, is_seed=True, page_size=25
)

oid = res.get("oid")
if oid:
res["pagesQueryUrl"] = (
get_origin(headers) + f"/api/orgs/{oid}/crawls/{crawlid}/pages"
if res.get("version", 1) == 2:
res["initialPages"], _ = await self.page_ops.list_pages(
crawlid, is_seed=True, page_size=25
)

oid = res.get("oid")
if oid:
res["pagesQueryUrl"] = (
get_origin(headers) + f"/api/orgs/{oid}/crawls/{crawlid}/pages"
)

crawl = CrawlOutWithResources.from_dict(res)

if not skip_resources:
Expand Down
Loading
Loading