Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2293,14 +2293,11 @@ dags_are_paused_at_creation = False

If you specify a hive conf to the run_cli command of the HiveHook, Airflow add some
convenience variables to the config. In case you run a secure Hadoop setup it might be
required to whitelist these variables by adding the following to your configuration:
required to allow these variables by adjusting you hive configuration to add `airflow\.ctx\..*` to the regex
of user-editable configuration properties. See
[the Hive docs on Configuration Properties][hive.security.authorization.sqlstd] for more info.

```
<property>
<name>hive.security.authorization.sqlstd.confwhitelist.append</name>
<value>airflow\.ctx\..*</value>
</property>
```
[hive.security.authorization.sqlstd]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=82903061#ConfigurationProperties-SQLStandardBasedAuthorization.1

### Google Cloud Operator and Hook alignment

Expand Down
12 changes: 6 additions & 6 deletions airflow/contrib/plugins/metastore_browser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
PRESTO_CONN_ID = 'presto_default'
HIVE_CLI_CONN_ID = 'hive_default'
DEFAULT_DB = 'default'
DB_WHITELIST = [] # type: List[str]
DB_BLACKLIST = ['tmp'] # type: List[str]
DB_ALLOW_LIST = [] # type: List[str]
DB_DENY_LIST = ['tmp'] # type: List[str]
TABLE_SELECTOR_LIMIT = 2000

# Keeping pandas from truncating long strings
Expand Down Expand Up @@ -139,11 +139,11 @@ def objects(self):
Retrieve objects from TBLS and DBS
"""
where_clause = ''
if DB_WHITELIST:
dbs = ",".join(["'" + db + "'" for db in DB_WHITELIST])
if DB_ALLOW_LIST:
dbs = ",".join(["'" + db + "'" for db in DB_ALLOW_LIST])
where_clause = "AND b.name IN ({})".format(dbs)
if DB_BLACKLIST:
dbs = ",".join(["'" + db + "'" for db in DB_BLACKLIST])
if DB_DENY_LIST:
dbs = ",".join(["'" + db + "'" for db in DB_DENY_LIST])
where_clause = "AND b.name NOT IN ({})".format(dbs)
sql = """
SELECT CONCAT(b.NAME, '.', a.TBL_NAME), TBL_TYPE
Expand Down
20 changes: 10 additions & 10 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
:type file_path: str
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_id_white_list: If specified, only look at these DAG ID's
:type dag_id_white_list: List[str]
:param dag_ids: If specified, only look at these DAG ID's
:type dag_ids: List[str]
:param failure_callback_requests: failure callback to execute
:type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
"""
Expand All @@ -83,13 +83,13 @@ def __init__(
self,
file_path: str,
pickle_dags: bool,
dag_id_white_list: Optional[List[str]],
dag_ids: Optional[List[str]],
failure_callback_requests: List[FailureCallbackRequest]
):
super().__init__()
self._file_path = file_path
self._pickle_dags = pickle_dags
self._dag_id_white_list = dag_id_white_list
self._dag_ids = dag_ids
self._failure_callback_requests = failure_callback_requests

# The process that was launched to process the given .
Expand All @@ -116,7 +116,7 @@ def file_path(self):
def _run_file_processor(result_channel,
file_path,
pickle_dags,
dag_id_white_list,
dag_ids,
thread_name,
failure_callback_requests):
"""
Expand All @@ -129,9 +129,9 @@ def _run_file_processor(result_channel,
:param pickle_dags: whether to pickle the DAGs found in the file and
save them to the DB
:type pickle_dags: bool
:param dag_id_white_list: if specified, only examine DAG ID's that are
:param dag_ids: if specified, only examine DAG ID's that are
in this list
:type dag_id_white_list: list[str]
:type dag_ids: list[str]
:param thread_name: the name to use for the process that is launched
:type thread_name: str
:param failure_callback_requests: failure callback to execute
Expand Down Expand Up @@ -160,7 +160,7 @@ def _run_file_processor(result_channel,
start_time = time.time()

log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
dag_file_processor = DagFileProcessor(dag_ids=dag_id_white_list, log=log)
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
result = dag_file_processor.process_file(
file_path=file_path,
pickle_dags=pickle_dags,
Expand Down Expand Up @@ -195,7 +195,7 @@ def start(self):
_child_channel,
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
self._dag_ids,
"DagFileProcessor{}".format(self._instance_id),
self._failure_callback_requests
),
Expand Down Expand Up @@ -1583,7 +1583,7 @@ def _create_dag_file_processor(file_path, failure_callback_requests, dag_ids, pi
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_id_white_list=dag_ids,
dag_ids=dag_ids,
failure_callback_requests=failure_callback_requests
)

Expand Down
22 changes: 16 additions & 6 deletions airflow/providers/apache/hive/operators/hive_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import json
import warnings
from collections import OrderedDict
from typing import Callable, Dict, List, Optional

Expand Down Expand Up @@ -48,9 +49,9 @@ class HiveStatsCollectionOperator(BaseOperator):
:param extra_exprs: dict of expression to run against the table where
keys are metric names and values are Presto compatible expressions
:type extra_exprs: dict
:param col_blacklist: list of columns to blacklist, consider
blacklisting blobs, large json columns, ...
:type col_blacklist: list
:param excluded_columns: list of columns to exclude, consider
excluding blobs, large json columns, ...
:type excluded_columns: list
:param assignment_func: a function that receives a column name and
a type, and returns a dict of metric names and an Presto expressions.
If None is returned, the global defaults are applied. If an
Expand All @@ -67,17 +68,26 @@ def __init__(self,
table: str,
partition: str,
extra_exprs: Optional[Dict] = None,
col_blacklist: Optional[List] = None,
excluded_columns: Optional[List] = None,
assignment_func: Optional[Callable[[str, str], Optional[Dict]]] = None,
metastore_conn_id: str = 'metastore_default',
presto_conn_id: str = 'presto_default',
mysql_conn_id: str = 'airflow_db',
*args, **kwargs) -> None:
if 'col_blacklist' in kwargs:
warnings.warn(
'col_blacklist kwarg passed to {c} (task_id: {t}) is deprecated, please rename it to '
'excluded_columns instead'.format(
c=self.__class__.__name__, t=kwargs.get('task_id')),
category=FutureWarning,
stacklevel=2
)
excluded_columns = kwargs.pop('col_blacklist')
super().__init__(*args, **kwargs)
self.table = table
self.partition = partition
self.extra_exprs = extra_exprs or {}
self.col_blacklist = col_blacklist or [] # type: List
self.excluded_columns = excluded_columns or [] # type: List
self.metastore_conn_id = metastore_conn_id
self.presto_conn_id = presto_conn_id
self.mysql_conn_id = mysql_conn_id
Expand All @@ -89,7 +99,7 @@ def get_default_exprs(self, col, col_type):
"""
Get default expressions
"""
if col in self.col_blacklist:
if col in self.excluded_columns:
return {}
exp = {(col, 'non_null'): f"COUNT({col})"}
if col_type in ['double', 'int', 'bigint', 'float']:
Expand Down
4 changes: 2 additions & 2 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
# run arbitrary code.
extension-pkg-whitelist=setproctitle

# Add files or directories to the blacklist. They should be base names, not
# Add files or directories to the ignore list. They should be base names, not
# paths.
ignore=CVS

# Add files or directories matching the regex patterns to the blacklist. The
# Add files or directories matching the regex patterns to the ignore list. The
# regex matches against base names, not paths.
ignore-patterns=

Expand Down
2 changes: 1 addition & 1 deletion tests/providers/apache/cassandra/hooks/test_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_get_lb_policy_invalid_policy(self):
TokenAwarePolicy,
expected_child_policy_type=RoundRobinPolicy)

def test_get_lb_policy_no_host_for_white_list(self):
def test_get_lb_policy_no_host_for_allow_list(self):
# test host not specified for WhiteListRoundRobinPolicy should throw exception
self._assert_get_lb_policy('WhiteListRoundRobinPolicy',
{},
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/apache/hive/operators/test_hive_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def test_get_default_exprs(self):
(col, 'non_null'): 'COUNT({})'.format(col)
})

def test_get_default_exprs_blacklist(self):
col = 'blacklisted_col'
self.kwargs.update(dict(col_blacklist=[col]))
def test_get_default_exprs_excluded_cols(self):
col = 'excluded_col'
self.kwargs.update(dict(excluded_columns=[col]))

default_exprs = HiveStatsCollectionOperator(**self.kwargs).get_default_exprs(col, None)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_stat_name_must_not_exceed_max_length(self):
self.stats.incr('X' * 300)
self.statsd_client.assert_not_called()

def test_stat_name_must_only_include_whitelisted_characters(self):
def test_stat_name_must_only_include_allowed_characters(self):
self.stats.incr('test/$tats')
self.statsd_client.assert_not_called()

Expand Down Expand Up @@ -159,7 +159,7 @@ def test_stat_name_must_not_exceed_max_length_with_dogstatsd(self):
self.dogstatsd.incr('X' * 300)
self.dogstatsd_client.assert_not_called()

def test_stat_name_must_only_include_whitelisted_characters_with_dogstatsd(self):
def test_stat_name_must_only_include_allowed_characters_with_dogstatsd(self):
self.dogstatsd.incr('test/$tats')
self.dogstatsd_client.assert_not_called()

Expand Down
4 changes: 2 additions & 2 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
class FakeDagFileProcessorRunner(DagFileProcessorProcess):
# This fake processor will return the zombies it received in constructor
# as its processing result w/o actually parsing anything.
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
def __init__(self, file_path, pickle_dags, dag_ids, zombies):
super().__init__(file_path, pickle_dags, dag_ids, zombies)
# We need a "real" selectable handle for waitable_handle to work
readable, writable = multiprocessing.Pipe(duplex=False)
writable.send('abc')
Expand Down