diff --git a/src/librustc_mir/dataflow/at_location.rs b/src/librustc_mir/dataflow/at_location.rs
index 7735528d8f8e0..f0014602e2d6b 100644
--- a/src/librustc_mir/dataflow/at_location.rs
+++ b/src/librustc_mir/dataflow/at_location.rs
@@ -8,6 +8,7 @@ use crate::dataflow::{BitDenotation, DataflowResults, GenKillSet};
 use crate::dataflow::move_paths::{HasMoveData, MovePathIndex};
 
 use std::iter;
+use std::borrow::Borrow;
 
 /// A trait for "cartesian products" of multiple FlowAtLocation.
 ///
@@ -60,18 +61,20 @@ pub trait FlowsAtLocation {
 /// (e.g., via `reconstruct_statement_effect` and
 /// `reconstruct_terminator_effect`; don't forget to call
 /// `apply_local_effect`).
-pub struct FlowAtLocation<'tcx, BD>
+pub struct FlowAtLocation<'tcx, BD, DR = DataflowResults<'tcx, BD>>
 where
     BD: BitDenotation<'tcx>,
+    DR: Borrow<DataflowResults<'tcx, BD>>,
 {
-    base_results: DataflowResults<'tcx, BD>,
+    base_results: DR,
     curr_state: BitSet<BD::Idx>,
     stmt_trans: GenKillSet<BD::Idx>,
 }
 
-impl<'tcx, BD> FlowAtLocation<'tcx, BD>
+impl<'tcx, BD, DR> FlowAtLocation<'tcx, BD, DR>
 where
     BD: BitDenotation<'tcx>,
+    DR: Borrow<DataflowResults<'tcx, BD>>,
 {
     /// Iterate over each bit set in the current state.
     pub fn each_state_bit<F>(&self, f: F)
@@ -91,8 +94,8 @@ where
         self.stmt_trans.gen_set.iter().for_each(f)
     }
 
-    pub fn new(results: DataflowResults<'tcx, BD>) -> Self {
-        let bits_per_block = results.sets().bits_per_block();
+    pub fn new(results: DR) -> Self {
+        let bits_per_block = results.borrow().sets().bits_per_block();
         let curr_state = BitSet::new_empty(bits_per_block);
         let stmt_trans = GenKillSet::from_elem(HybridBitSet::new_empty(bits_per_block));
         FlowAtLocation {
@@ -104,7 +107,7 @@ where
 
     /// Access the underlying operator.
     pub fn operator(&self) -> &BD {
-        self.base_results.operator()
+        self.base_results.borrow().operator()
     }
 
     pub fn contains(&self, x: BD::Idx) -> bool {
@@ -134,27 +137,31 @@ where
     }
 }
 
-impl<'tcx, BD> FlowsAtLocation for FlowAtLocation<'tcx, BD>
-    where BD: BitDenotation<'tcx>
+impl<'tcx, BD, DR> FlowsAtLocation for FlowAtLocation<'tcx, BD, DR>
+where
+    BD: BitDenotation<'tcx>,
+    DR: Borrow<DataflowResults<'tcx, BD>>,
 {
     fn reset_to_entry_of(&mut self, bb: BasicBlock) {
-        self.curr_state.overwrite(self.base_results.sets().entry_set_for(bb.index()));
+        self.curr_state.overwrite(self.base_results.borrow().sets().entry_set_for(bb.index()));
     }
 
     fn reset_to_exit_of(&mut self, bb: BasicBlock) {
         self.reset_to_entry_of(bb);
-        let trans = self.base_results.sets().trans_for(bb.index());
+        let trans = self.base_results.borrow().sets().trans_for(bb.index());
         trans.apply(&mut self.curr_state)
     }
 
     fn reconstruct_statement_effect(&mut self, loc: Location) {
         self.stmt_trans.clear();
         self.base_results
+            .borrow()
             .operator()
             .before_statement_effect(&mut self.stmt_trans, loc);
         self.stmt_trans.apply(&mut self.curr_state);
 
         self.base_results
+            .borrow()
             .operator()
             .statement_effect(&mut self.stmt_trans, loc);
     }
@@ -162,11 +169,13 @@ impl<'tcx, BD> FlowsAtLocation for FlowAtLocation<'tcx, BD>
     fn reconstruct_terminator_effect(&mut self, loc: Location) {
         self.stmt_trans.clear();
         self.base_results
+            .borrow()
             .operator()
             .before_terminator_effect(&mut self.stmt_trans, loc);
         self.stmt_trans.apply(&mut self.curr_state);
 
         self.base_results
+            .borrow()
             .operator()
             .terminator_effect(&mut self.stmt_trans, loc);
     }
@@ -177,9 +186,10 @@ impl<'tcx, BD> FlowsAtLocation for FlowAtLocation<'tcx, BD>
 }
 
 
-impl<'tcx, T> FlowAtLocation<'tcx, T>
+impl<'tcx, T, DR> FlowAtLocation<'tcx, T, DR>
 where
     T: HasMoveData<'tcx> + BitDenotation<'tcx, Idx = MovePathIndex>,
+    DR: Borrow<DataflowResults<'tcx, T>>,
 {
     pub fn has_any_child_of(&self, mpi: T::Idx) -> Option<T::Idx> {
         // We process `mpi` before the loop below, for two reasons:
diff --git a/src/librustc_mir/dataflow/impls/storage_liveness.rs b/src/librustc_mir/dataflow/impls/storage_liveness.rs
index d2003993d4506..7fa950cb98d34 100644
--- a/src/librustc_mir/dataflow/impls/storage_liveness.rs
+++ b/src/librustc_mir/dataflow/impls/storage_liveness.rs
@@ -1,7 +1,13 @@
 pub use super::*;
 
 use rustc::mir::*;
+use rustc::mir::visit::{
+    PlaceContext, Visitor, NonMutatingUseContext,
+};
+use std::cell::RefCell;
 use crate::dataflow::BitDenotation;
+use crate::dataflow::HaveBeenBorrowedLocals;
+use crate::dataflow::{DataflowResults, DataflowResultsCursor, DataflowResultsRefCursor};
 
 #[derive(Copy, Clone)]
 pub struct MaybeStorageLive<'a, 'tcx> {
@@ -27,7 +33,9 @@ impl<'a, 'tcx> BitDenotation<'tcx> for MaybeStorageLive<'a, 'tcx> {
     }
 
     fn start_block_effect(&self, _on_entry: &mut BitSet<Local>) {
-        // Nothing is live on function entry
+        // Nothing is live on function entry (generators only have a self
+        // argument, and we don't care about that)
+        assert_eq!(1, self.body.arg_count);
     }
 
     fn statement_effect(&self,
@@ -63,3 +71,123 @@ impl<'a, 'tcx> BottomValue for MaybeStorageLive<'a, 'tcx> {
     /// bottom = dead
     const BOTTOM_VALUE: bool = false;
 }
+
+/// Dataflow analysis that determines whether each local requires storage at a
+/// given location; i.e. whether its storage can go away without being observed.
+pub struct RequiresStorage<'mir, 'tcx> {
+    body: &'mir Body<'tcx>,
+    borrowed_locals:
+        RefCell<DataflowResultsRefCursor<'mir, 'tcx, HaveBeenBorrowedLocals<'mir, 'tcx>>>,
+}
+
+impl<'mir, 'tcx: 'mir> RequiresStorage<'mir, 'tcx> {
+    pub fn new(
+        body: &'mir Body<'tcx>,
+        borrowed_locals: &'mir DataflowResults<'tcx, HaveBeenBorrowedLocals<'mir, 'tcx>>,
+    ) -> Self {
+        RequiresStorage {
+            body,
+            borrowed_locals: RefCell::new(DataflowResultsCursor::new(borrowed_locals, body)),
+        }
+    }
+
+    pub fn body(&self) -> &Body<'tcx> {
+        self.body
+    }
+}
+
+impl<'mir, 'tcx> BitDenotation<'tcx> for RequiresStorage<'mir, 'tcx> {
+    type Idx = Local;
+    fn name() -> &'static str { "requires_storage" }
+    fn bits_per_block(&self) -> usize {
+        self.body.local_decls.len()
+    }
+
+    fn start_block_effect(&self, _sets: &mut BitSet<Local>) {
+        // Nothing is live on function entry (generators only have a self
+        // argument, and we don't care about that)
+        assert_eq!(1, self.body.arg_count);
+    }
+
+    fn statement_effect(&self,
+                        sets: &mut GenKillSet<Local>,
+                        loc: Location) {
+        self.check_for_move(sets, loc);
+        self.check_for_borrow(sets, loc);
+
+        let stmt = &self.body[loc.block].statements[loc.statement_index];
+        match stmt.kind {
+            StatementKind::StorageLive(l) => sets.gen(l),
+            StatementKind::StorageDead(l) => sets.kill(l),
+            StatementKind::Assign(ref place, _)
+            | StatementKind::SetDiscriminant { ref place, .. } => {
+                place.base_local().map(|l| sets.gen(l));
+            }
+            StatementKind::InlineAsm(box InlineAsm { ref outputs, .. }) => {
+                for p in &**outputs {
+                    p.base_local().map(|l| sets.gen(l));
+                }
+            }
+            _ => (),
+        }
+    }
+
+    fn terminator_effect(&self,
+                         sets: &mut GenKillSet<Local>,
+                         loc: Location) {
+        self.check_for_move(sets, loc);
+        self.check_for_borrow(sets, loc);
+    }
+
+    fn propagate_call_return(
+        &self,
+        in_out: &mut BitSet<Local>,
+        _call_bb: mir::BasicBlock,
+        _dest_bb: mir::BasicBlock,
+        dest_place: &mir::Place<'tcx>,
+    ) {
+        dest_place.base_local().map(|l| in_out.insert(l));
+    }
+}
+
+impl<'mir, 'tcx> RequiresStorage<'mir, 'tcx> {
+    /// Kill locals that are fully moved and have not been borrowed.
+    fn check_for_move(&self, sets: &mut GenKillSet<Local>, loc: Location) {
+        let mut visitor = MoveVisitor {
+            sets,
+            borrowed_locals: &self.borrowed_locals,
+        };
+        visitor.visit_location(self.body, loc);
+    }
+
+    /// Gen locals that are newly borrowed. This includes borrowing any part of
+    /// a local (we rely on this behavior of `HaveBeenBorrowedLocals`).
+    fn check_for_borrow(&self, sets: &mut GenKillSet<Local>, loc: Location) {
+        let mut borrowed_locals = self.borrowed_locals.borrow_mut();
+        borrowed_locals.seek(loc);
+        borrowed_locals.each_gen_bit(|l| sets.gen(l));
+    }
+}
+
+impl<'mir, 'tcx> BottomValue for RequiresStorage<'mir, 'tcx> {
+    /// bottom = dead
+    const BOTTOM_VALUE: bool = false;
+}
+
+struct MoveVisitor<'a, 'mir, 'tcx> {
+    borrowed_locals:
+        &'a RefCell<DataflowResultsRefCursor<'mir, 'tcx, HaveBeenBorrowedLocals<'mir, 'tcx>>>,
+    sets: &'a mut GenKillSet<Local>,
+}
+
+impl<'a, 'mir: 'a, 'tcx> Visitor<'tcx> for MoveVisitor<'a, 'mir, 'tcx> {
+    fn visit_local(&mut self, local: &Local, context: PlaceContext, loc: Location) {
+        if PlaceContext::NonMutatingUse(NonMutatingUseContext::Move) == context {
+            let mut borrowed_locals = self.borrowed_locals.borrow_mut();
+            borrowed_locals.seek(loc);
+            if !borrowed_locals.contains(*local) {
+                self.sets.kill(*local);
+            }
+        }
+    }
+}
diff --git a/src/librustc_mir/dataflow/mod.rs b/src/librustc_mir/dataflow/mod.rs
index 80f65a9c8d04e..25e7a5d9cf378 100644
--- a/src/librustc_mir/dataflow/mod.rs
+++ b/src/librustc_mir/dataflow/mod.rs
@@ -17,7 +17,7 @@ use std::io;
 use std::path::PathBuf;
 use std::usize;
 
-pub use self::impls::{MaybeStorageLive};
+pub use self::impls::{MaybeStorageLive, RequiresStorage};
 pub use self::impls::{MaybeInitializedPlaces, MaybeUninitializedPlaces};
 pub use self::impls::DefinitelyInitializedPlaces;
 pub use self::impls::EverInitializedPlaces;
@@ -344,6 +344,99 @@ pub(crate) trait DataflowResultsConsumer<'a, 'tcx: 'a> {
     fn body(&self) -> &'a Body<'tcx>;
 }
 
+/// Allows iterating dataflow results in a flexible and reasonably fast way.
+pub struct DataflowResultsCursor<'mir, 'tcx, BD, DR = DataflowResults<'tcx, BD>>
+where
+    BD: BitDenotation<'tcx>,
+    DR: Borrow<DataflowResults<'tcx, BD>>,
+{
+    flow_state: FlowAtLocation<'tcx, BD, DR>,
+
+    // The statement (or terminator) whose effect has been reconstructed in
+    // flow_state.
+    curr_loc: Option<Location>,
+
+    body: &'mir Body<'tcx>,
+}
+
+pub type DataflowResultsRefCursor<'mir, 'tcx, BD> =
+    DataflowResultsCursor<'mir, 'tcx, BD, &'mir DataflowResults<'tcx, BD>>;
+
+impl<'mir, 'tcx, BD, DR> DataflowResultsCursor<'mir, 'tcx, BD, DR>
+where
+    BD: BitDenotation<'tcx>,
+    DR: Borrow<DataflowResults<'tcx, BD>>,
+{
+    pub fn new(result: DR, body: &'mir Body<'tcx>) -> Self {
+        DataflowResultsCursor {
+            flow_state: FlowAtLocation::new(result),
+            curr_loc: None,
+            body,
+        }
+    }
+
+    /// Seek to the given location in MIR. This method is fast if you are
+    /// traversing your MIR statements in order.
+    ///
+    /// After calling `seek`, the current state will reflect all effects up to
+    /// and including the `before_statement_effect` of the statement at location
+    /// `loc`. The `statement_effect` of the statement at `loc` will be
+    /// available as the current effect (see e.g. `each_gen_bit`).
+    ///
+    /// If `loc.statement_index` equals the number of statements in the block,
+    /// we will reconstruct the terminator effect in the same way as described
+    /// above.
+    pub fn seek(&mut self, loc: Location) {
+        if self.curr_loc.map(|cur| loc == cur).unwrap_or(false) {
+            return;
+        }
+
+        let start_index;
+        let should_reset = match self.curr_loc {
+            None => true,
+            Some(cur)
+                if loc.block != cur.block || loc.statement_index < cur.statement_index => true,
+            _ => false,
+        };
+        if should_reset {
+            self.flow_state.reset_to_entry_of(loc.block);
+            start_index = 0;
+        } else {
+            let curr_loc = self.curr_loc.unwrap();
+            start_index = curr_loc.statement_index;
+            // Apply the effect from the last seek to the current state.
+            self.flow_state.apply_local_effect(curr_loc);
+        }
+
+        for stmt in start_index..loc.statement_index {
+            let mut stmt_loc = loc;
+            stmt_loc.statement_index = stmt;
+            self.flow_state.reconstruct_statement_effect(stmt_loc);
+            self.flow_state.apply_local_effect(stmt_loc);
+        }
+
+        if loc.statement_index == self.body[loc.block].statements.len() {
+            self.flow_state.reconstruct_terminator_effect(loc);
+        } else {
+            self.flow_state.reconstruct_statement_effect(loc);
+        }
+        self.curr_loc = Some(loc);
+    }
+
+    /// Return whether the current state contains bit `x`.
+    pub fn contains(&self, x: BD::Idx) -> bool {
+        self.flow_state.contains(x)
+    }
+
+    /// Iterate over each `gen` bit in the current effect (invoke `seek` first).
+    pub fn each_gen_bit<F>(&self, f: F)
+    where
+        F: FnMut(BD::Idx),
+    {
+        self.flow_state.each_gen_bit(f)
+    }
+}
+
 pub fn state_for_location<'tcx, T: BitDenotation<'tcx>>(loc: Location,
                                                         analysis: &T,
                                                         result: &DataflowResults<'tcx, T>,
diff --git a/src/librustc_mir/transform/generator.rs b/src/librustc_mir/transform/generator.rs
index ba8c47c665e03..6d6bce5ffbbc7 100644
--- a/src/librustc_mir/transform/generator.rs
+++ b/src/librustc_mir/transform/generator.rs
@@ -68,7 +68,7 @@ use crate::transform::simplify;
 use crate::transform::no_landing_pads::no_landing_pads;
 use crate::dataflow::{DataflowResults, DataflowResultsConsumer, FlowAtLocation};
 use crate::dataflow::{do_dataflow, DebugFormatted, state_for_location};
-use crate::dataflow::{MaybeStorageLive, HaveBeenBorrowedLocals};
+use crate::dataflow::{MaybeStorageLive, HaveBeenBorrowedLocals, RequiresStorage};
 use crate::util::dump_mir;
 use crate::util::liveness;
 
@@ -437,16 +437,18 @@ fn locals_live_across_suspend_points(
 
     // Calculate the MIR locals which have been previously
     // borrowed (even if they are still active).
-    // This is only used for immovable generators.
-    let borrowed_locals = if !movable {
-        let analysis = HaveBeenBorrowedLocals::new(body);
-        let result =
-            do_dataflow(tcx, body, def_id, &[], &dead_unwinds, analysis,
-                        |bd, p| DebugFormatted::new(&bd.body().local_decls[p]));
-        Some((analysis, result))
-    } else {
-        None
-    };
+    let borrowed_locals_analysis = HaveBeenBorrowedLocals::new(body);
+    let borrowed_locals_result =
+        do_dataflow(tcx, body, def_id, &[], &dead_unwinds, borrowed_locals_analysis,
+                    |bd, p| DebugFormatted::new(&bd.body().local_decls[p]));
+
+    // Calculate the MIR locals that we actually need to keep storage around
+    // for.
+    let requires_storage_analysis = RequiresStorage::new(body, &borrowed_locals_result);
+    let requires_storage =
+        do_dataflow(tcx, body, def_id, &[], &dead_unwinds, requires_storage_analysis,
+                    |bd, p| DebugFormatted::new(&bd.body().local_decls[p]));
+    let requires_storage_analysis = RequiresStorage::new(body, &borrowed_locals_result);
 
     // Calculate the liveness of MIR locals ignoring borrows.
     let mut live_locals = liveness::LiveVarSet::new_empty(body.local_decls.len());
@@ -471,10 +473,10 @@ fn locals_live_across_suspend_points(
                 statement_index: data.statements.len(),
             };
 
-            if let Some((ref analysis, ref result)) = borrowed_locals {
+            if !movable {
                 let borrowed_locals = state_for_location(loc,
-                                                         analysis,
-                                                         result,
+                                                         &borrowed_locals_analysis,
+                                                         &borrowed_locals_result,
                                                          body);
                 // The `liveness` variable contains the liveness of MIR locals ignoring borrows.
                 // This is correct for movable generators since borrows cannot live across
@@ -489,27 +491,34 @@ fn locals_live_across_suspend_points(
                 liveness.outs[block].union(&borrowed_locals);
             }
 
-            let mut storage_liveness = state_for_location(loc,
-                                                          &storage_live_analysis,
-                                                          &storage_live,
-                                                          body);
+            let storage_liveness = state_for_location(loc,
+                                                      &storage_live_analysis,
+                                                      &storage_live,
+                                                      body);
 
             // Store the storage liveness for later use so we can restore the state
             // after a suspension point
             storage_liveness_map.insert(block, storage_liveness.clone());
 
-            // Mark locals without storage statements as always having live storage
-            storage_liveness.union(&ignored.0);
+            let mut storage_required = state_for_location(loc,
+                                                          &requires_storage_analysis,
+                                                          &requires_storage,
+                                                          body);
+
+            // Mark locals without storage statements as always requiring storage
+            storage_required.union(&ignored.0);
 
             // Locals live are live at this point only if they are used across
             // suspension points (the `liveness` variable)
-            // and their storage is live (the `storage_liveness` variable)
-            let mut live_locals_here = storage_liveness;
+            // and their storage is required (the `storage_required` variable)
+            let mut live_locals_here = storage_required;
             live_locals_here.intersect(&liveness.outs[block]);
 
             // The generator argument is ignored
             live_locals_here.remove(self_arg());
 
+            debug!("loc = {:?}, live_locals_here = {:?}", loc, live_locals_here);
+
             // Add the locals live at this suspension point to the set of locals which live across
             // any suspension points
             live_locals.union(&live_locals_here);
@@ -517,6 +526,7 @@ fn locals_live_across_suspend_points(
             live_locals_at_suspension_points.push(live_locals_here);
         }
     }
+    debug!("live_locals = {:?}", live_locals);
 
     // Renumber our liveness_map bitsets to include only the locals we are
     // saving.
@@ -529,8 +539,8 @@ fn locals_live_across_suspend_points(
         body,
         &live_locals,
         &ignored,
-        storage_live,
-        storage_live_analysis);
+        requires_storage,
+        requires_storage_analysis);
 
     LivenessInfo {
         live_locals,
@@ -567,8 +577,8 @@ fn compute_storage_conflicts(
     body: &'mir Body<'tcx>,
     stored_locals: &liveness::LiveVarSet,
     ignored: &StorageIgnored,
-    storage_live: DataflowResults<'tcx, MaybeStorageLive<'mir, 'tcx>>,
-    _storage_live_analysis: MaybeStorageLive<'mir, 'tcx>,
+    requires_storage: DataflowResults<'tcx, RequiresStorage<'mir, 'tcx>>,
+    _requires_storage_analysis: RequiresStorage<'mir, 'tcx>,
 ) -> BitMatrix<GeneratorSavedLocal, GeneratorSavedLocal> {
     assert_eq!(body.local_decls.len(), ignored.0.domain_size());
     assert_eq!(body.local_decls.len(), stored_locals.domain_size());
@@ -584,9 +594,9 @@ fn compute_storage_conflicts(
     let mut visitor = StorageConflictVisitor {
         body,
         stored_locals: &stored_locals,
-        local_conflicts: BitMatrix::from_row_n(&ineligible_locals, body.local_decls.len())
+        local_conflicts: BitMatrix::from_row_n(&ineligible_locals, body.local_decls.len()),
     };
-    let mut state = FlowAtLocation::new(storage_live);
+    let mut state = FlowAtLocation::new(requires_storage);
     visitor.analyze_results(&mut state);
     let local_conflicts = visitor.local_conflicts;
 
@@ -627,7 +637,7 @@ struct StorageConflictVisitor<'body, 'tcx, 's> {
 impl<'body, 'tcx, 's> DataflowResultsConsumer<'body, 'tcx>
     for StorageConflictVisitor<'body, 'tcx, 's>
 {
-    type FlowState = FlowAtLocation<'tcx, MaybeStorageLive<'body, 'tcx>>;
+    type FlowState = FlowAtLocation<'tcx, RequiresStorage<'body, 'tcx>>;
 
     fn body(&self) -> &'body Body<'tcx> {
         self.body
@@ -657,7 +667,7 @@ impl<'body, 'tcx, 's> DataflowResultsConsumer<'body, 'tcx>
 
 impl<'body, 'tcx, 's> StorageConflictVisitor<'body, 'tcx, 's> {
     fn apply_state(&mut self,
-                   flow_state: &FlowAtLocation<'tcx, MaybeStorageLive<'body, 'tcx>>,
+                   flow_state: &FlowAtLocation<'tcx, RequiresStorage<'body, 'tcx>>,
                    loc: Location) {
         // Ignore unreachable blocks.
         match self.body.basic_blocks()[loc.block].terminator().kind {
diff --git a/src/test/run-pass/async-await/async-fn-size-moved-locals.rs b/src/test/run-pass/async-await/async-fn-size-moved-locals.rs
new file mode 100644
index 0000000000000..139be7fe0132b
--- /dev/null
+++ b/src/test/run-pass/async-await/async-fn-size-moved-locals.rs
@@ -0,0 +1,98 @@
+// Test that we don't duplicate storage for futures moved around in .await, and
+// for futures moved into other futures.
+//
+// The exact sizes can change by a few bytes (we'd like to know when they do).
+// What we don't want to see is the wrong multiple of 1024 (the size of BigFut)
+// being reflected in the size.
+//
+// See issue #59123 for a full explanation.
+
+// edition:2018
+
+#![feature(async_await)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+const BIG_FUT_SIZE: usize = 1024;
+struct BigFut([u8; BIG_FUT_SIZE]);
+
+impl BigFut {
+    fn new() -> Self {
+        BigFut([0; BIG_FUT_SIZE])
+    } }
+
+impl Drop for BigFut {
+    fn drop(&mut self) {}
+}
+
+impl Future for BigFut {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
+        Poll::Ready(())
+    }
+}
+
+#[allow(dead_code)]
+struct Joiner {
+    a: Option<BigFut>,
+    b: Option<BigFut>,
+    c: Option<BigFut>,
+}
+
+impl Future for Joiner {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
+        Poll::Ready(())
+    }
+}
+
+fn noop() {}
+
+async fn single() {
+    let x = BigFut::new();
+    x.await;
+}
+
+async fn single_with_noop() {
+    let x = BigFut::new();
+    noop();
+    x.await;
+}
+
+async fn joined() {
+    let a = BigFut::new();
+    let b = BigFut::new();
+    let c = BigFut::new();
+
+    let joiner = Joiner {
+        a: Some(a),
+        b: Some(b),
+        c: Some(c),
+    };
+    joiner.await
+}
+
+async fn joined_with_noop() {
+    let a = BigFut::new();
+    let b = BigFut::new();
+    let c = BigFut::new();
+
+    let joiner = Joiner {
+        a: Some(a),
+        b: Some(b),
+        c: Some(c),
+    };
+    noop();
+    joiner.await
+}
+
+fn main() {
+    assert_eq!(1028, std::mem::size_of_val(&single()));
+    assert_eq!(1032, std::mem::size_of_val(&single_with_noop()));
+    assert_eq!(3084, std::mem::size_of_val(&joined()));
+    assert_eq!(3084, std::mem::size_of_val(&joined_with_noop()));
+}
diff --git a/src/test/run-pass/generator/size-moved-locals.rs b/src/test/run-pass/generator/size-moved-locals.rs
new file mode 100644
index 0000000000000..37e2e0cfdcccf
--- /dev/null
+++ b/src/test/run-pass/generator/size-moved-locals.rs
@@ -0,0 +1,62 @@
+// Test that we don't duplicate storage for a variable that is moved to another
+// binding. This used to happen in the presence of unwind and drop edges (see
+// `complex` below.)
+//
+// The exact sizes here can change (we'd like to know when they do). What we
+// don't want to see is the `complex` generator size being upwards of 2048 bytes
+// (which would indicate it is reserving space for two copies of Foo.)
+//
+// See issue #59123 for a full explanation.
+
+// edition:2018
+
+#![feature(generators, generator_trait)]
+
+use std::ops::Generator;
+
+const FOO_SIZE: usize = 1024;
+struct Foo([u8; FOO_SIZE]);
+
+impl Drop for Foo {
+    fn drop(&mut self) {}
+}
+
+fn move_before_yield() -> impl Generator<Yield = (), Return = ()> {
+    static || {
+        let first = Foo([0; FOO_SIZE]);
+        let _second = first;
+        yield;
+        // _second dropped here
+    }
+}
+
+fn noop() {}
+
+fn move_before_yield_with_noop() -> impl Generator<Yield = (), Return = ()> {
+    static || {
+        let first = Foo([0; FOO_SIZE]);
+        noop();
+        let _second = first;
+        yield;
+        // _second dropped here
+    }
+}
+
+// Today we don't have NRVO (we allocate space for both `first` and `second`,)
+// but we can overlap `first` with `_third`.
+fn overlap_move_points() -> impl Generator<Yield = (), Return = ()> {
+    static || {
+        let first = Foo([0; FOO_SIZE]);
+        yield;
+        let second = first;
+        yield;
+        let _third = second;
+        yield;
+    }
+}
+
+fn main() {
+    assert_eq!(1028, std::mem::size_of_val(&move_before_yield()));
+    assert_eq!(1032, std::mem::size_of_val(&move_before_yield_with_noop()));
+    assert_eq!(2056, std::mem::size_of_val(&overlap_move_points()));
+}