Source code for exporters.transform.jq_transform
import six
import yaml
from exporters.records.base_record import BaseRecord
from exporters.transform.base_transform import BaseTransform
def _compile_jq(jq_expr):
"""Compile JQ expression, returning a JQ program object
See: https://pypi.python.org/pypi/jq
"""
import jq
return jq.jq(jq_expr)
[docs]class JQTransform(BaseTransform):
"""
It applies jq transformations to items. To see documentation
about possible jq transformations please refer to its
`official documentation <http://stedolan.github.io/jq/manual/>`_.
- jq_filter (str)
Valid jq filter
"""
supported_options = {
'jq_filter': {'type': six.string_types}
}
def __init__(self, *args, **kwargs):
super(JQTransform, self).__init__(*args, **kwargs)
self.jq_expression = self.read_option('jq_filter')
self.logger.info('JQTransform has been initiated. Expression: {}'.format(
self.jq_expression))
self.jq_program = _compile_jq(self.jq_expression)
[docs] def transform_batch(self, batch):
for item in batch:
try:
transformed_item = self.jq_program.transform(item)
except StopIteration:
# jq.transform() raise StopIteration for filtered items
continue
if not isinstance(transformed_item, dict):
transformed_item = yaml.safe_load(transformed_item)
yield BaseRecord(transformed_item)
self.logger.debug('Transformed items')