Skip to content

DAOS-17737 dtx: handle race between DTX refresh and DTX abort #16535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 16 additions & 40 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -851,18 +851,19 @@ dtx_shares_fini(struct dtx_handle *dth)
int
dtx_handle_reinit(struct dtx_handle *dth)
{
D_ASSERT(dth->dth_aborted == 0);
D_ASSERT(dth->dth_already == 0);

if (dth->dth_modification_cnt > 0) {
D_ASSERT(dth->dth_ent != NULL);
D_ASSERT(dth->dth_pinned != 0);
}
D_ASSERT(dth->dth_already == 0);

dth->dth_modify_shared = 0;
dth->dth_active = 0;
dth->dth_modify_shared = 0;
dth->dth_active = 0;
dth->dth_touched_leader_oid = 0;
dth->dth_local_tx_started = 0;
dth->dth_cos_done = 0;
dth->dth_aborted = 0;
dth->dth_local_tx_started = 0;
dth->dth_cos_done = 0;

dth->dth_op_seq = 0;
dth->dth_oid_cnt = 0;
Expand Down Expand Up @@ -1267,7 +1268,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_child *cont, int re
struct dtx_memberships *mbs;
size_t size;
uint32_t flags;
int status = -1;
int status;
int rc = 0;
bool aborted = false;

Expand All @@ -1284,41 +1285,16 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_child *cont, int re
if (dth->dth_solo)
goto out;

if (dth->dth_need_validation) {
/* During waiting for bulk data transfer or other non-leaders, the DTX
* status may be changes by others (such as DTX resync or DTX refresh)
* by race. Let's check it before handling the case of 'result < 0' to
* avoid aborting 'ready' one.
*/
status = vos_dtx_validation(dth);
if (unlikely(status == DTX_ST_COMMITTED || status == DTX_ST_COMMITTABLE ||
status == DTX_ST_COMMITTING))
D_GOTO(out, result = -DER_ALREADY);
}

if (result < 0)
/* During waiting for bulk data transfer or other non-leaders, the DTX status maybe
* changes by others (such as DTX resync or DTX refresh) by race. Let's check it
* before handling the case of 'result < 0' to avoid aborting 'ready' one.
*/
status = vos_dtx_validation(dth);
if (status != -DER_ALREADY && result < 0)
goto abort;

switch (status) {
case -1:
break;
case DTX_ST_PREPARED:
if (likely(!dth->dth_aborted))
break;
/* Fall through */
case DTX_ST_INITED:
case DTX_ST_PREPARING:
aborted = true;
result = -DER_AGAIN;
goto out;
case DTX_ST_ABORTED:
case DTX_ST_ABORTING:
aborted = true;
result = -DER_INPROGRESS;
goto out;
default:
D_ASSERTF(0, "Unexpected DTX "DF_DTI" status %d\n", DP_DTI(&dth->dth_xid), status);
}
if (status < 0)
D_GOTO(out, result = status);

if (dlh->dlh_relay)
goto out;
Expand Down
53 changes: 5 additions & 48 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1363,59 +1363,16 @@ dtx_refresh(struct dtx_handle *dth, struct ds_cont_child *cont)
if (DAOS_FAIL_CHECK(DAOS_DTX_NO_RETRY))
return -DER_IO;

rc = dtx_refresh_internal(cont, &dth->dth_share_tbd_count,
&dth->dth_share_tbd_list,
&dth->dth_share_cmt_list,
&dth->dth_share_abt_list,
rc = dtx_refresh_internal(cont, &dth->dth_share_tbd_count, &dth->dth_share_tbd_list,
&dth->dth_share_cmt_list, &dth->dth_share_abt_list,
&dth->dth_share_act_list, true);

/* If we can resolve the DTX status, then return -DER_AGAIN
* to the caller that will retry related operation locally.
*/
if (rc == 0) {
D_ASSERT(dth->dth_share_tbd_count == 0);

if (dth->dth_need_validation) {
rc = vos_dtx_validation(dth);
switch (rc) {
case DTX_ST_INITED:
if (!dth->dth_aborted)
break;
/* Fall through */
case DTX_ST_PREPARED:
case DTX_ST_PREPARING:
/* The DTX has been ever aborted and related resent RPC
* is in processing. Return -DER_AGAIN to make this ULT
* to retry sometime later without dtx_abort().
*/
rc = -DER_AGAIN;
break;
case DTX_ST_ABORTED:
D_ASSERT(dth->dth_ent == NULL);
/* Aborted, return -DER_INPROGRESS for client retry.
*
* Fall through.
*/
case DTX_ST_ABORTING:
rc = -DER_INPROGRESS;
break;
case DTX_ST_COMMITTED:
case DTX_ST_COMMITTING:
case DTX_ST_COMMITTABLE:
/* Aborted then prepared/committed by race.
* Return -DER_ALREADY to avoid repeated modification.
*/
dth->dth_already = 1;
rc = -DER_ALREADY;
break;
default:
D_ASSERTF(0, "Unexpected DTX "DF_DTI" status %d\n",
DP_DTI(&dth->dth_xid), rc);
}
} else {
rc = vos_dtx_validation(dth);
if (rc == 0) {
vos_dtx_cleanup(dth, false);
dtx_handle_reinit(dth);
rc = -DER_AGAIN;
rc = dtx_handle_reinit(dth);
}
}

Expand Down
93 changes: 16 additions & 77 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -1743,26 +1743,7 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io
* Let's check resent again before further process.
*/
if (rc == 0 && obj_rpc_is_update(rpc) && sched_cur_seq() != sched_seq) {
if (dth->dth_need_validation) {
daos_epoch_t epoch = 0;
int rc1;

rc1 = dtx_handle_resend(ioc->ioc_vos_coh, &orw->orw_dti, &epoch, NULL);
switch (rc1) {
case 0:
orw->orw_epoch = epoch;
/* Fall through */
case -DER_ALREADY:
rc = -DER_ALREADY;
break;
case -DER_NONEXIST:
case -DER_EP_OLD:
break;
default:
rc = rc1;
break;
}
}
rc = vos_dtx_validation(dth);

/* For solo update, it will be handled via one-phase transaction.
* If there is CPU yield after its epoch generated, we will renew
Expand All @@ -1788,7 +1769,7 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io
}

/* re-generate the recx_list if some akeys skipped */
if (skips != NULL && orwo->orw_rels.ca_arrays != NULL && orw->orw_nr != iods_nr)
if (rc == 0 && skips != NULL && orwo->orw_rels.ca_arrays != NULL && orw->orw_nr != iods_nr)
rc = obj_rw_recx_list_post(orw, orwo, skips, rc);

rc = obj_rw_complete(rpc, ioc, ioh, rc, dth);
Expand All @@ -1803,7 +1784,7 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io
}
if (iods_dup != NULL)
daos_iod_recx_free(iods_dup, iods_nr);
return unlikely(rc == -DER_ALREADY) ? 0 : rc;
return rc;
}

/* Extract local iods/offs/csums by orw_oid.id_shard from @orw */
Expand Down Expand Up @@ -2041,7 +2022,7 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (++retry < 3) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
if (rc == 0)
goto again;
} else if (orw->orw_flags & ORF_MAYBE_STARVE) {
dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
Expand Down Expand Up @@ -2754,7 +2735,7 @@ ds_obj_tgt_update_handler(crt_rpc_t *rpc)
rc = obj_local_rw(rpc, &ioc, dth);
if (rc != 0)
DL_CDEBUG(
rc == -DER_INPROGRESS || rc == -DER_TX_RESTART ||
rc == -DER_INPROGRESS || rc == -DER_TX_RESTART || rc == -DER_ALREADY ||
(rc == -DER_EXIST &&
(orw->orw_api_flags & (DAOS_COND_DKEY_INSERT | DAOS_COND_AKEY_INSERT))) ||
(rc == -DER_NONEXIST &&
Expand Down Expand Up @@ -3269,7 +3250,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
if (obj_dtx_need_refresh(dth, rc)) {
rc = dtx_refresh(dth, ioc->ioc_coc);
/* After DTX refresh, re_pack will resume from the position at \@anchors. */
if (rc == -DER_AGAIN)
if (rc == 0)
goto re_pack;
}

Expand Down Expand Up @@ -3577,44 +3558,15 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, uint32_t shard_nr, u
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc != -DER_AGAIN)
if (rc != 0)
goto out;

if (unlikely(sched_cur_seq() == sched_seq))
goto again;

/*
* There is CPU yield after DTX start, and the resent RPC may be handled
* during that. Let's check resent again before further process.
*/

if (dth->dth_need_validation) {
daos_epoch_t epoch = 0;
int rc1;

rc1 = dtx_handle_resend(ioc->ioc_vos_coh, &opi->opi_dti, &epoch, NULL);
switch (rc1) {
case 0:
opi->opi_epoch = epoch;
/* Fall through */
case -DER_ALREADY:
rc = -DER_ALREADY;
break;
case -DER_NONEXIST:
case -DER_EP_OLD:
break;
default:
rc = rc1;
break;
}
}

/*
* For solo punch, it will be handled via one-phase transaction. If there is CPU
* yield after its epoch generated, we will renew the epoch, then we can use the
* epoch to sort related solo DTXs based on their epochs.
*/
if (rc == -DER_AGAIN && dth->dth_solo) {
if (dth->dth_solo && sched_cur_seq() != sched_seq) {
struct dtx_epoch epoch;

epoch.oe_value = d_hlc_get();
Expand Down Expand Up @@ -3706,8 +3658,8 @@ obj_tgt_punch(struct obj_tgt_punch_args *otpa, uint32_t *shards, uint32_t count)
exec:
rc = obj_local_punch(opi, otpa->opc, count, shards, p_ioc, dth);
if (rc != 0)
DL_CDEBUG(rc == -DER_INPROGRESS || rc == -DER_TX_RESTART ||
(rc == -DER_NONEXIST && (opi->opi_api_flags & DAOS_COND_PUNCH)),
DL_CDEBUG(rc == -DER_INPROGRESS || rc == -DER_TX_RESTART || rc == -DER_ALREADY ||
(rc == -DER_NONEXIST && (opi->opi_api_flags & DAOS_COND_PUNCH)),
DB_IO, DLOG_ERR, rc, DF_UOID, DP_UOID(opi->opi_oid));

out:
Expand Down Expand Up @@ -4126,7 +4078,7 @@ obj_local_query(struct obj_tgt_query_args *otqa, struct obj_io_context *ioc, dao
p_recx, p_epoch, cell_size, stripe_size, dth);
if (obj_dtx_need_refresh(dth, rc)) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
if (rc == 0)
goto again;
}

Expand Down Expand Up @@ -4783,24 +4735,11 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp
* Let's check resent again before further process.
*/
if (rc == 0 && dth->dth_modification_cnt > 0 && sched_cur_seq() != sched_seq) {
if (dth->dth_need_validation) {
daos_epoch_t epoch = 0;
int rc1;

rc1 = dtx_handle_resend(ioc->ioc_vos_coh, &dcsh->dcsh_xid, &epoch, NULL);
switch (rc1) {
case 0:
case -DER_ALREADY:
D_GOTO(out, rc = -DER_ALREADY);
case -DER_NONEXIST:
case -DER_EP_OLD:
break;
default:
D_GOTO(out, rc = rc1);
}
}
rc = vos_dtx_validation(dth);
if (rc != 0)
goto out;

if (rc == 0 && dth->dth_solo) {
if (dth->dth_solo) {
daos_epoch_t epoch = dcsh->dcsh_epoch.oe_value;

D_ASSERT(dcde->dcde_read_cnt == 0);
Expand Down Expand Up @@ -4968,7 +4907,7 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh,
if (obj_dtx_need_refresh(dth, rc)) {
if (++retry < 3) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
if (rc == 0)
goto again;
} else if (oci->oci_flags & ORF_MAYBE_STARVE) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
Expand Down
44 changes: 8 additions & 36 deletions src/vos/vos_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,42 +369,13 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in,
dae->dae_preparing = 0;
}

if (err == 0 && unlikely(dth->dth_need_validation && dth->dth_active)) {
/* Aborted by race during the yield for local TX commit. */
rc = vos_dtx_validation(dth);
switch (rc) {
case DTX_ST_INITED:
case DTX_ST_PREPARED:
case DTX_ST_PREPARING:
/* The DTX has been ever aborted and related resent RPC
* is in processing. Return -DER_AGAIN to make this ULT
* to retry sometime later without dtx_abort().
*/
err = -DER_AGAIN;
break;
case DTX_ST_ABORTED:
D_ASSERT(dae == NULL);
/* Aborted, return -DER_INPROGRESS for client retry.
*
* Fall through.
*/
case DTX_ST_ABORTING:
err = -DER_INPROGRESS;
break;
case DTX_ST_COMMITTED:
case DTX_ST_COMMITTING:
case DTX_ST_COMMITTABLE:
/* Aborted then prepared/committed by race.
* Return -DER_ALREADY to avoid repeated modification.
*/
dth->dth_already = 1;
err = -DER_ALREADY;
break;
default:
D_ASSERTF(0, "Unexpected DTX "DF_DTI" status %d\n",
DP_DTI(&dth->dth_xid), rc);
}
} else if (dae != NULL) {
if (err == 0 && dth->dth_active) {
err = vos_dtx_validation(dth);
if (err != 0)
goto out;
}

if (dae != NULL) {
if (dth->dth_solo) {
if (err == 0 && dae->dae_committing &&
cont->vc_solo_dtx_epoch < dth->dth_epoch)
Expand All @@ -429,6 +400,7 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in,
}
}

out:
if (err != 0) {
/* Do not set dth->dth_pinned. Upper layer caller can do that via
* vos_dtx_cleanup() when necessary.
Expand Down
Loading
Loading