diff --git a/doc/source/whatsnew/v1.0.0.rst b/doc/source/whatsnew/v1.0.0.rst
index 98d861d999ea9..da651d95b1300 100644
--- a/doc/source/whatsnew/v1.0.0.rst
+++ b/doc/source/whatsnew/v1.0.0.rst
@@ -453,6 +453,7 @@ Groupby/resample/rolling
 - Bug in :meth:`DataFrameGroupby.agg` not able to use lambda function with named aggregation (:issue:`27519`)
 - Bug in :meth:`DataFrame.groupby` losing column name information when grouping by a categorical column (:issue:`28787`)
 - Bug in :meth:`DataFrameGroupBy.rolling().quantile()` ignoring ``interpolation`` keyword argument (:issue:`28779`)
+- Bug in :meth:`DataFrame.groupby` where ``any``, ``all``, ``nunique`` and transform functions would incorrectly handle duplicate column labels (:issue:`21668`)
 
 Reshaping
 ^^^^^^^^^
diff --git a/pandas/core/groupby/base.py b/pandas/core/groupby/base.py
index fed387cbeade4..407cd8342d486 100644
--- a/pandas/core/groupby/base.py
+++ b/pandas/core/groupby/base.py
@@ -3,8 +3,12 @@
 hold the whitelist of methods that are exposed on the
 SeriesGroupBy and the DataFrameGroupBy objects.
 """
+import collections
+
 from pandas.core.dtypes.common import is_list_like, is_scalar
 
+OutputKey = collections.namedtuple("OutputKey", ["label", "position"])
+
 
 class GroupByMixin:
     """
diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py
index 0ca6ef043fffb..621666d2ddcff 100644
--- a/pandas/core/groupby/generic.py
+++ b/pandas/core/groupby/generic.py
@@ -10,7 +10,17 @@
 from functools import partial
 from textwrap import dedent
 import typing
-from typing import Any, Callable, FrozenSet, Iterable, Sequence, Type, Union, cast
+from typing import (
+    Any,
+    Callable,
+    FrozenSet,
+    Iterable,
+    Mapping,
+    Sequence,
+    Type,
+    Union,
+    cast,
+)
 
 import numpy as np
 
@@ -309,28 +319,91 @@ def _aggregate_multiple_funcs(self, arg):
 
         return DataFrame(results, columns=columns)
 
-    def _wrap_series_output(self, output, index, names=None):
-        """ common agg/transform wrapping logic """
-        output = output[self._selection_name]
+    def _wrap_series_output(
+        self, output: Mapping[base.OutputKey, Union[Series, np.ndarray]], index: Index,
+    ) -> Union[Series, DataFrame]:
+        """
+        Wraps the output of a SeriesGroupBy operation into the expected result.
+
+        Parameters
+        ----------
+        output : Mapping[base.OutputKey, Union[Series, np.ndarray]]
+            Data to wrap.
+        index : pd.Index
+            Index to apply to the output.
 
-        if names is not None:
-            return DataFrame(output, index=index, columns=names)
+        Returns
+        -------
+        Series or DataFrame
+
+        Notes
+        -----
+        In the vast majority of cases output and columns will only contain one
+        element. The exception is operations that expand dimensions, like ohlc.
+        """
+        indexed_output = {key.position: val for key, val in output.items()}
+        columns = Index(key.label for key in output)
+
+        result: Union[Series, DataFrame]
+        if len(output) > 1:
+            result = DataFrame(indexed_output, index=index)
+            result.columns = columns
         else:
-            name = self._selection_name
-            if name is None:
-                name = self._selected_obj.name
-            return Series(output, index=index, name=name)
+            result = Series(indexed_output[0], index=index, name=columns[0])
+
+        return result
+
+    def _wrap_aggregated_output(
+        self, output: Mapping[base.OutputKey, Union[Series, np.ndarray]]
+    ) -> Union[Series, DataFrame]:
+        """
+        Wraps the output of a SeriesGroupBy aggregation into the expected result.
 
-    def _wrap_aggregated_output(self, output, names=None):
+        Parameters
+        ----------
+        output : Mapping[base.OutputKey, Union[Series, np.ndarray]]
+            Data to wrap.
+
+        Returns
+        -------
+        Series or DataFrame
+
+        Notes
+        -----
+        In the vast majority of cases output will only contain one element.
+        The exception is operations that expand dimensions, like ohlc.
+        """
         result = self._wrap_series_output(
-            output=output, index=self.grouper.result_index, names=names
+            output=output, index=self.grouper.result_index
         )
         return self._reindex_output(result)._convert(datetime=True)
 
-    def _wrap_transformed_output(self, output, names=None):
-        return self._wrap_series_output(
-            output=output, index=self.obj.index, names=names
-        )
+    def _wrap_transformed_output(
+        self, output: Mapping[base.OutputKey, Union[Series, np.ndarray]]
+    ) -> Series:
+        """
+        Wraps the output of a SeriesGroupBy aggregation into the expected result.
+
+        Parameters
+        ----------
+        output : dict[base.OutputKey, Union[Series, np.ndarray]]
+            Dict with a sole key of 0 and a value of the result values.
+
+        Returns
+        -------
+        Series
+
+        Notes
+        -----
+        output should always contain one element. It is specified as a dict
+        for consistency with DataFrame methods and _wrap_aggregated_output.
+        """
+        assert len(output) == 1
+        result = self._wrap_series_output(output=output, index=self.obj.index)
+
+        # No transformations increase the ndim of the result
+        assert isinstance(result, Series)
+        return result
 
     def _wrap_applied_output(self, keys, values, not_indexed_same=False):
         if len(keys) == 0:
@@ -1082,17 +1155,6 @@ def _aggregate_item_by_item(self, func, *args, **kwargs) -> DataFrame:
 
         return DataFrame(result, columns=result_columns)
 
-    def _decide_output_index(self, output, labels):
-        if len(output) == len(labels):
-            output_keys = labels
-        else:
-            output_keys = sorted(output)
-
-            if isinstance(labels, MultiIndex):
-                output_keys = MultiIndex.from_tuples(output_keys, names=labels.names)
-
-        return output_keys
-
     def _wrap_applied_output(self, keys, values, not_indexed_same=False):
         if len(keys) == 0:
             return DataFrame(index=keys)
@@ -1559,27 +1621,62 @@ def _insert_inaxis_grouper_inplace(self, result):
             if in_axis:
                 result.insert(0, name, lev)
 
-    def _wrap_aggregated_output(self, output, names=None):
-        agg_axis = 0 if self.axis == 1 else 1
-        agg_labels = self._obj_with_exclusions._get_axis(agg_axis)
+    def _wrap_aggregated_output(
+        self, output: Mapping[base.OutputKey, Union[Series, np.ndarray]]
+    ) -> DataFrame:
+        """
+        Wraps the output of DataFrameGroupBy aggregations into the expected result.
 
-        output_keys = self._decide_output_index(output, agg_labels)
+        Parameters
+        ----------
+        output : Mapping[base.OutputKey, Union[Series, np.ndarray]]
+           Data to wrap.
+
+        Returns
+        -------
+        DataFrame
+        """
+        indexed_output = {key.position: val for key, val in output.items()}
+        columns = Index(key.label for key in output)
+
+        result = DataFrame(indexed_output)
+        result.columns = columns
 
         if not self.as_index:
-            result = DataFrame(output, columns=output_keys)
             self._insert_inaxis_grouper_inplace(result)
             result = result._consolidate()
         else:
             index = self.grouper.result_index
-            result = DataFrame(output, index=index, columns=output_keys)
+            result.index = index
 
         if self.axis == 1:
             result = result.T
 
         return self._reindex_output(result)._convert(datetime=True)
 
-    def _wrap_transformed_output(self, output, names=None) -> DataFrame:
-        return DataFrame(output, index=self.obj.index)
+    def _wrap_transformed_output(
+        self, output: Mapping[base.OutputKey, Union[Series, np.ndarray]]
+    ) -> DataFrame:
+        """
+        Wraps the output of DataFrameGroupBy transformations into the expected result.
+
+        Parameters
+        ----------
+        output : Mapping[base.OutputKey, Union[Series, np.ndarray]]
+            Data to wrap.
+
+        Returns
+        -------
+        DataFrame
+        """
+        indexed_output = {key.position: val for key, val in output.items()}
+        columns = Index(key.label for key in output)
+
+        result = DataFrame(indexed_output)
+        result.columns = columns
+        result.index = self.obj.index
+
+        return result
 
     def _wrap_agged_blocks(self, items, blocks):
         if not self.as_index:
@@ -1699,9 +1796,11 @@ def groupby_series(obj, col=None):
         if isinstance(obj, Series):
             results = groupby_series(obj)
         else:
+            # TODO: this is duplicative of how GroupBy naturally works
+            # Try to consolidate with normal wrapping functions
             from pandas.core.reshape.concat import concat
 
-            results = [groupby_series(obj[col], col) for col in obj.columns]
+            results = [groupby_series(content, label) for label, content in obj.items()]
             results = concat(results, axis=1)
             results.columns.names = obj.columns.names
 
@@ -1743,7 +1842,7 @@ def _normalize_keyword_aggregation(kwargs):
     """
     Normalize user-provided "named aggregation" kwargs.
 
-    Transforms from the new ``Dict[str, NamedAgg]`` style kwargs
+    Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
     to the old OrderedDict[str, List[scalar]]].
 
     Parameters
@@ -1764,7 +1863,7 @@ def _normalize_keyword_aggregation(kwargs):
     >>> _normalize_keyword_aggregation({'output': ('input', 'sum')})
     (OrderedDict([('input', ['sum'])]), ('output',), [('input', 'sum')])
     """
-    # Normalize the aggregation functions as Dict[column, List[func]],
+    # Normalize the aggregation functions as Mapping[column, List[func]],
     # process normally, then fixup the names.
     # TODO(Py35): When we drop python 3.5, change this to
     # defaultdict(list)
diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py
index 99a4942df4f7f..fe2e3441f2adc 100644
--- a/pandas/core/groupby/groupby.py
+++ b/pandas/core/groupby/groupby.py
@@ -7,14 +7,23 @@ class providing the base-class of operations.
 expose these user-facing objects to provide specific functionailty.
 """
 
-import collections
 from contextlib import contextmanager
 import datetime
 from functools import partial, wraps
 import inspect
 import re
 import types
-from typing import FrozenSet, Iterable, List, Optional, Tuple, Type, Union
+from typing import (
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    Type,
+    Union,
+)
 
 import numpy as np
 
@@ -41,7 +50,7 @@ class providing the base-class of operations.
 
 from pandas.core import nanops
 import pandas.core.algorithms as algorithms
-from pandas.core.arrays import Categorical, try_cast_to_ea
+from pandas.core.arrays import Categorical, DatetimeArray, try_cast_to_ea
 from pandas.core.base import DataError, PandasObject, SelectionMixin
 import pandas.core.common as com
 from pandas.core.frame import DataFrame
@@ -819,31 +828,33 @@ def _transform_should_cast(self, func_nm: str) -> bool:
         )
 
     def _cython_transform(self, how: str, numeric_only: bool = True, **kwargs):
-        output = collections.OrderedDict()  # type: dict
-        for obj in self._iterate_slices():
+        output: Dict[base.OutputKey, np.ndarray] = {}
+        for idx, obj in enumerate(self._iterate_slices()):
             name = obj.name
             is_numeric = is_numeric_dtype(obj.dtype)
             if numeric_only and not is_numeric:
                 continue
 
             try:
-                result, names = self.grouper.transform(obj.values, how, **kwargs)
+                result, _ = self.grouper.transform(obj.values, how, **kwargs)
             except NotImplementedError:
                 continue
+
             if self._transform_should_cast(how):
-                output[name] = self._try_cast(result, obj)
-            else:
-                output[name] = result
+                result = self._try_cast(result, obj)
+
+            key = base.OutputKey(label=name, position=idx)
+            output[key] = result
 
         if len(output) == 0:
             raise DataError("No numeric types to aggregate")
 
-        return self._wrap_transformed_output(output, names)
+        return self._wrap_transformed_output(output)
 
-    def _wrap_aggregated_output(self, output, names=None):
+    def _wrap_aggregated_output(self, output: Mapping[base.OutputKey, np.ndarray]):
         raise AbstractMethodError(self)
 
-    def _wrap_transformed_output(self, output, names=None):
+    def _wrap_transformed_output(self, output: Mapping[base.OutputKey, np.ndarray]):
         raise AbstractMethodError(self)
 
     def _wrap_applied_output(self, keys, values, not_indexed_same: bool = False):
@@ -852,30 +863,48 @@ def _wrap_applied_output(self, keys, values, not_indexed_same: bool = False):
     def _cython_agg_general(
         self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
     ):
-        output = {}
+        output: Dict[base.OutputKey, Union[np.ndarray, DatetimeArray]] = {}
+        # Ideally we would be able to enumerate self._iterate_slices and use
+        # the index from enumeration as the key of output, but ohlc in particular
+        # returns a (n x 4) array. Output requires 1D ndarrays as values, so we
+        # need to slice that up into 1D arrays
+        idx = 0
         for obj in self._iterate_slices():
             name = obj.name
             is_numeric = is_numeric_dtype(obj.dtype)
             if numeric_only and not is_numeric:
                 continue
 
-            result, names = self.grouper.aggregate(
+            result, agg_names = self.grouper.aggregate(
                 obj._values, how, min_count=min_count
             )
-            output[name] = self._try_cast(result, obj)
+
+            if agg_names:
+                # e.g. ohlc
+                assert len(agg_names) == result.shape[1]
+                for result_column, result_name in zip(result.T, agg_names):
+                    key = base.OutputKey(label=result_name, position=idx)
+                    output[key] = self._try_cast(result_column, obj)
+                    idx += 1
+            else:
+                assert result.ndim == 1
+                key = base.OutputKey(label=name, position=idx)
+                output[key] = self._try_cast(result, obj)
+                idx += 1
 
         if len(output) == 0:
             raise DataError("No numeric types to aggregate")
 
-        return self._wrap_aggregated_output(output, names)
+        return self._wrap_aggregated_output(output)
 
     def _python_agg_general(self, func, *args, **kwargs):
         func = self._is_builtin_func(func)
         f = lambda x: func(x, *args, **kwargs)
 
         # iterate through "columns" ex exclusions to populate output dict
-        output = {}
-        for obj in self._iterate_slices():
+        output: Dict[base.OutputKey, np.ndarray] = {}
+
+        for idx, obj in enumerate(self._iterate_slices()):
             name = obj.name
             if self.grouper.ngroups == 0:
                 # agg_series below assumes ngroups > 0
@@ -895,7 +924,8 @@ def _python_agg_general(self, func, *args, **kwargs):
 
             result, counts = self.grouper.agg_series(obj, f)
             assert result is not None
-            output[name] = self._try_cast(result, obj, numeric_only=True)
+            key = base.OutputKey(label=name, position=idx)
+            output[key] = self._try_cast(result, obj, numeric_only=True)
 
         if len(output) == 0:
             return self._python_apply_general(f)
@@ -903,14 +933,14 @@ def _python_agg_general(self, func, *args, **kwargs):
         if self.grouper._filter_empty_groups:
 
             mask = counts.ravel() > 0
-            for name, result in output.items():
+            for key, result in output.items():
 
                 # since we are masking, make sure that we have a float object
                 values = result
                 if is_numeric_dtype(values.dtype):
                     values = ensure_float(values)
 
-                output[name] = self._try_cast(values[mask], result)
+                output[key] = self._try_cast(values[mask], result)
 
         return self._wrap_aggregated_output(output)
 
@@ -2221,10 +2251,10 @@ def _get_cythonized_result(
         grouper = self.grouper
 
         labels, _, ngroups = grouper.group_info
-        output = collections.OrderedDict()  # type: dict
+        output: Dict[base.OutputKey, np.ndarray] = {}
         base_func = getattr(libgroupby, how)
 
-        for obj in self._iterate_slices():
+        for idx, obj in enumerate(self._iterate_slices()):
             name = obj.name
             values = obj._data._values
 
@@ -2258,7 +2288,8 @@ def _get_cythonized_result(
             if post_processing:
                 result = post_processing(result, inferences)
 
-            output[name] = result
+            key = base.OutputKey(label=name, position=idx)
+            output[key] = result
 
         if aggregate:
             return self._wrap_aggregated_output(output)
diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py
index 47ca2b2190ecf..7fd9fb8f53134 100644
--- a/pandas/core/groupby/ops.py
+++ b/pandas/core/groupby/ops.py
@@ -424,8 +424,15 @@ def _get_cython_func_and_vals(
         return func, values
 
     def _cython_operation(
-        self, kind: str, values, how: str, axis: int, min_count: int = -1, **kwargs
-    ):
+        self, kind: str, values, how: str, axis, min_count: int = -1, **kwargs
+    ) -> Tuple[np.ndarray, Optional[List[str]]]:
+        """
+        Returns the values of a cython operation as a Tuple of [data, names].
+
+        Names is only useful when dealing with 2D results, like ohlc
+        (see self._name_functions).
+        """
+
         assert kind in ["transform", "aggregate"]
         orig_values = values
 
diff --git a/pandas/tests/groupby/conftest.py b/pandas/tests/groupby/conftest.py
index af98f9efe2af9..5b8cc86513954 100644
--- a/pandas/tests/groupby/conftest.py
+++ b/pandas/tests/groupby/conftest.py
@@ -2,7 +2,7 @@
 import pytest
 
 from pandas import DataFrame, MultiIndex
-from pandas.core.groupby.base import reduction_kernels
+from pandas.core.groupby.base import reduction_kernels, transformation_kernels
 import pandas.util.testing as tm
 
 
@@ -110,3 +110,15 @@ def reduction_func(request):
     """yields the string names of all groupby reduction functions, one at a time.
     """
     return request.param
+
+
+@pytest.fixture(params=transformation_kernels)
+def transformation_func(request):
+    """yields the string names of all groupby transformation functions."""
+    return request.param
+
+
+@pytest.fixture(params=sorted(reduction_kernels) + sorted(transformation_kernels))
+def groupby_func(request):
+    """yields both aggregation and transformation functions."""
+    return request.param
diff --git a/pandas/tests/groupby/test_groupby.py b/pandas/tests/groupby/test_groupby.py
index 0d68ff36dfa20..b848e9caad9be 100644
--- a/pandas/tests/groupby/test_groupby.py
+++ b/pandas/tests/groupby/test_groupby.py
@@ -1951,3 +1951,39 @@ def test_groupby_only_none_group():
     expected = pd.Series([np.nan], name="x")
 
     tm.assert_series_equal(actual, expected)
+
+
+@pytest.mark.parametrize("bool_agg_func", ["any", "all"])
+def test_bool_aggs_dup_column_labels(bool_agg_func):
+    # 21668
+    df = pd.DataFrame([[True, True]], columns=["a", "a"])
+    grp_by = df.groupby([0])
+    result = getattr(grp_by, bool_agg_func)()
+
+    expected = df
+    tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.parametrize(
+    "idx", [pd.Index(["a", "a"]), pd.MultiIndex.from_tuples((("a", "a"), ("a", "a")))]
+)
+def test_dup_labels_output_shape(groupby_func, idx):
+    if groupby_func in {"size", "ngroup", "cumcount"}:
+        pytest.skip("Not applicable")
+
+    df = pd.DataFrame([[1, 1]], columns=idx)
+    grp_by = df.groupby([0])
+
+    args = []
+    if groupby_func in {"fillna", "nth"}:
+        args.append(0)
+    elif groupby_func == "corrwith":
+        args.append(df)
+    elif groupby_func == "tshift":
+        df.index = [pd.Timestamp("today")]
+        args.extend([1, "D"])
+
+    result = getattr(grp_by, groupby_func)(*args)
+
+    assert result.shape == (1, 2)
+    tm.assert_index_equal(result.columns, idx)