Skip to content

Add async support in EngineClient, EngineSampler, etc. #5219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cirq-core/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# functools.cached_property was introduced in python 3.8
backports.cached_property~=1.0.1; python_version < '3.8'

duet~=0.2.0
duet~=0.2.6
matplotlib~=3.0
networkx~=2.4
numpy~=1.16
Expand Down
83 changes: 51 additions & 32 deletions cirq-google/cirq_google/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import string
from typing import Dict, Iterable, List, Optional, Sequence, Set, TypeVar, Union, TYPE_CHECKING

import duet
import google.auth
from google.protobuf import any_pb2

Expand Down Expand Up @@ -208,7 +209,7 @@ def __str__(self) -> str:
return f'Engine(project_id={self.project_id!r})'

@util.deprecated_gate_set_parameter
def run(
async def run_async(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're moving towards encouraging people to use EngineProcessor.run methods instead of Engine because AbstractEngine doesn't actually have any of these methods so they don't get mocked by the simulated versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted changes to the run* methods on Engine.

self,
program: cirq.AbstractCircuit,
program_id: Optional[str] = None,
Expand Down Expand Up @@ -254,23 +255,25 @@ def run(
Raises:
ValueError: If no gate set is provided.
"""
return list(
self.run_sweep(
program=program,
program_id=program_id,
job_id=job_id,
params=[param_resolver],
repetitions=repetitions,
processor_ids=processor_ids,
program_description=program_description,
program_labels=program_labels,
job_description=job_description,
job_labels=job_labels,
)
)[0]
job = await self.run_sweep_async(
program=program,
program_id=program_id,
job_id=job_id,
params=[param_resolver],
repetitions=repetitions,
processor_ids=processor_ids,
program_description=program_description,
program_labels=program_labels,
job_description=job_description,
job_labels=job_labels,
)
results = await job.results_async()
return results[0]

run = duet.sync(run_async)

@util.deprecated_gate_set_parameter
def run_sweep(
async def run_sweep_async(
self,
program: cirq.AbstractCircuit,
program_id: Optional[str] = None,
Expand Down Expand Up @@ -320,10 +323,10 @@ def run_sweep(
Raises:
ValueError: If no gate set is provided.
"""
engine_program = self.create_program(
engine_program = await self.create_program_async(
program, program_id, description=program_description, labels=program_labels
)
return engine_program.run_sweep(
return await engine_program.run_sweep_async(
job_id=job_id,
params=params,
repetitions=repetitions,
Expand All @@ -332,8 +335,10 @@ def run_sweep(
labels=job_labels,
)

run_sweep = duet.sync(run_sweep_async)

@util.deprecated_gate_set_parameter
def run_batch(
async def run_batch_async(
self,
programs: Sequence[cirq.AbstractCircuit],
program_id: Optional[str] = None,
Expand Down Expand Up @@ -405,7 +410,7 @@ def run_batch(
engine_program = self.create_batch_program(
programs, program_id, description=program_description, labels=program_labels
)
return engine_program.run_batch(
return await engine_program.run_batch_async(
job_id=job_id,
params_list=params_list,
repetitions=repetitions,
Expand All @@ -414,6 +419,8 @@ def run_batch(
labels=job_labels,
)

run_batch = duet.sync(run_batch_async)

@util.deprecated_gate_set_parameter
def run_calibration(
self,
Expand Down Expand Up @@ -493,7 +500,7 @@ def run_calibration(
)

@util.deprecated_gate_set_parameter
def create_program(
async def create_program_async(
self,
program: cirq.AbstractCircuit,
program_id: Optional[str] = None,
Expand Down Expand Up @@ -524,7 +531,7 @@ def create_program(
if not program_id:
program_id = _make_random_id('prog-')

new_program_id, new_program = self.context.client.create_program(
new_program_id, new_program = await self.context.client.create_program_async(
self.project_id,
program_id,
code=self.context._serialize_program(program, gate_set),
Expand All @@ -536,8 +543,10 @@ def create_program(
self.project_id, new_program_id, self.context, new_program
)

create_program = duet.sync(create_program_async)

@util.deprecated_gate_set_parameter
def create_batch_program(
async def create_batch_program_async(
self,
programs: Sequence[cirq.AbstractCircuit],
program_id: Optional[str] = None,
Expand Down Expand Up @@ -574,7 +583,7 @@ def create_batch_program(
for program in programs:
gate_set.serialize(program, msg=batch.programs.add())

new_program_id, new_program = self.context.client.create_program(
new_program_id, new_program = await self.context.client.create_program_async(
self.project_id,
program_id,
code=util.pack_any(batch),
Expand All @@ -586,8 +595,10 @@ def create_batch_program(
self.project_id, new_program_id, self.context, new_program, result_type=ResultType.Batch
)

create_batch_program = duet.sync(create_batch_program_async)

@util.deprecated_gate_set_parameter
def create_calibration_program(
async def create_calibration_program_async(
self,
layers: List['cirq_google.CalibrationLayer'],
program_id: Optional[str] = None,
Expand Down Expand Up @@ -632,7 +643,7 @@ def create_calibration_program(
arg_to_proto(layer.args[arg], out=new_layer.args[arg])
gate_set.serialize(layer.program, msg=new_layer.layer)

new_program_id, new_program = self.context.client.create_program(
new_program_id, new_program = await self.context.client.create_program_async(
self.project_id,
program_id,
code=util.pack_any(calibration),
Expand All @@ -648,6 +659,8 @@ def create_calibration_program(
result_type=ResultType.Calibration,
)

create_calibration_program = duet.sync(create_calibration_program_async)

def get_program(self, program_id: str) -> engine_program.EngineProgram:
"""Returns an EngineProgram for an existing Quantum Engine program.

Expand All @@ -659,7 +672,7 @@ def get_program(self, program_id: str) -> engine_program.EngineProgram:
"""
return engine_program.EngineProgram(self.project_id, program_id, self.context)

def list_programs(
async def list_programs_async(
self,
created_before: Optional[Union[datetime.datetime, datetime.date]] = None,
created_after: Optional[Union[datetime.datetime, datetime.date]] = None,
Expand All @@ -681,7 +694,7 @@ def list_programs(
"""

client = self.context.client
response = client.list_programs(
response = await client.list_programs_async(
self.project_id,
created_before=created_before,
created_after=created_after,
Expand All @@ -697,7 +710,9 @@ def list_programs(
for p in response
]

def list_jobs(
list_programs = duet.sync(list_programs_async)

async def list_jobs_async(
self,
created_before: Optional[Union[datetime.datetime, datetime.date]] = None,
created_after: Optional[Union[datetime.datetime, datetime.date]] = None,
Expand Down Expand Up @@ -730,7 +745,7 @@ def list_jobs(
`quantum.ExecutionStatus.State` enum for accepted values.
"""
client = self.context.client
response = client.list_jobs(
response = await client.list_jobs_async(
self.project_id,
None,
created_before=created_before,
Expand All @@ -749,7 +764,9 @@ def list_jobs(
for j in response
]

def list_processors(self) -> List[engine_processor.EngineProcessor]:
list_jobs = duet.sync(list_jobs_async)

async def list_processors_async(self) -> List[engine_processor.EngineProcessor]:
"""Returns a list of Processors that the user has visibility to in the
current Engine project. The names of these processors are used to
identify devices when scheduling jobs and gathering calibration metrics.
Expand All @@ -758,14 +775,16 @@ def list_processors(self) -> List[engine_processor.EngineProcessor]:
A list of EngineProcessors to access status, device and calibration
information.
"""
response = self.context.client.list_processors(self.project_id)
response = await self.context.client.list_processors_async(self.project_id)
return [
engine_processor.EngineProcessor(
self.project_id, engine_client._ids_from_processor_name(p.name)[1], self.context, p
)
for p in response
]

list_processors = duet.sync(list_processors_async)

def get_processor(self, processor_id: str) -> engine_processor.EngineProcessor:
"""Returns an EngineProcessor for a Quantum Engine processor.

Expand Down
Loading