From dc4d36d3924ceccfe458d1ca3eac22756b859d21 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Wed, 2 Sep 2020 12:00:16 +0200 Subject: [PATCH 1/3] fix attribute type key to make it unique --- .../iot/cygnus/aggregation/NGSIGenericColumnAggregator.java | 6 +++--- .../java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java | 2 +- .../java/com/telefonica/iot/cygnus/utils/NGSIUtils.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java index f2e8558a2..85e828df5 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java @@ -68,7 +68,7 @@ public void initialize(NGSIEvent event) { String attrName = contextAttribute.getName(); aggregation.put(attrName, new ArrayList()); aggregation.put(attrName + "_md", new ArrayList()); - aggregation.put(attrName + "_type", new ArrayList()); + aggregation.put(attrName + "_typeAut0_G3n3r47eD_", new ArrayList()); } // for setAggregation(aggregation); } // initialize @@ -127,7 +127,7 @@ public void aggregate(NGSIEvent event) { if (aggregation.containsKey(attrName)) { aggregation.get(attrName).add(attrValue); aggregation.get(attrName + "_md").add(jsonAttrMetadata); - aggregation.get(attrName + "_type").add(new JsonPrimitive(attrType)); + aggregation.get(attrName + "_typeAut0_G3n3r47eD_").add(new JsonPrimitive(attrType)); } else { ArrayList values = new ArrayList(Collections.nCopies(numPreviousValues, null)); values.add(attrValue); @@ -137,7 +137,7 @@ public void aggregate(NGSIEvent event) { aggregation.put(attrName + "_md", valuesMd); ArrayList valuesType = new ArrayList(Collections.nCopies(numPreviousValues, null)); valuesType.add(new JsonPrimitive(attrType)); - aggregation.put(attrName + "_type", valuesType); + aggregation.put(attrName + "_typeAut0_G3n3r47eD_", valuesType); } // if else } // for // Iterate on all the aggregations, checking for not updated attributes; add an empty value if missing diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java index bb07af8a0..1fb50925d 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java @@ -683,7 +683,7 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA JsonElement metadata = genericAggregator.getAggregation().get(attributeName + "_md").get(i); if (genericAggregator.isAttrMetadataStore() && metadata != null && !metadata.toString().isEmpty() && !metadata.toString().contains("[]")) { String attrMdFileName = buildAttrMdFilePath(genericAggregator.getService(), genericAggregator.getServicePathForNaming(), genericAggregator.getEntityForNaming(), attributeName, - genericAggregator.getAggregation().get(attributeName + "_type").get(i).toString()); + genericAggregator.getAggregation().get(attributeName + "_typeAut0_G3n3r47eD_").get(i).toString()); String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName; line += csvSeparator + printableAttrMdFileName; if (metadata.isJsonPrimitive()) { diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java index bed7bb5d8..4492c81f5 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java @@ -376,7 +376,7 @@ public static LinkedHashMap> linkedHashMapWithout Iterator it = aggregation.keySet().iterator(); while (it.hasNext()) { String entry = (String) it.next(); - if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains("_type")) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { + if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains("_typeAut0_G3n3r47eD_")) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { keysToCrop.add(entry); } } @@ -400,7 +400,7 @@ public static ArrayList attributeNames(LinkedHashMap cropedList = (ArrayList) attributeNames.clone(); for (String key : cropedList) { - if (key.contains("_md") || key.contains("_type")) { + if (key.contains("_md") || key.contains("_typeAut0_G3n3r47eD_")) { attributeNames.remove(key); } } From 90252a76f388609b34698fb90d948b300adbe28f Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Wed, 2 Sep 2020 12:01:11 +0200 Subject: [PATCH 2/3] add CNR entry --- CHANGES_NEXT_RELEASE | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index f34e91592..74e5b06d9 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1,3 +1,4 @@ +[cygnus-ngsi][ColumnAggregator] Fix attribute type key to make it unique. (#1904) [cygnus-ngsi][ColumnAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902) [cygnus-ngsi][CKANSink] Fix bug about bad service name used by CKAN orgName with a name mapping (#1910) [cygnus-ngsi][CKANSink] Add flag option to disable metatada in CKAN (#1912) From ab16f1ce2ac550c0deb41b66330ce6c287f1027e Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Wed, 2 Sep 2020 12:13:54 +0200 Subject: [PATCH 3/3] create a attr type constant --- .../iot/cygnus/aggregation/NGSIGenericColumnAggregator.java | 6 +++--- .../java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java | 2 +- .../java/com/telefonica/iot/cygnus/utils/NGSIConstants.java | 2 ++ .../java/com/telefonica/iot/cygnus/utils/NGSIUtils.java | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java index 85e828df5..d286eff94 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java @@ -68,7 +68,7 @@ public void initialize(NGSIEvent event) { String attrName = contextAttribute.getName(); aggregation.put(attrName, new ArrayList()); aggregation.put(attrName + "_md", new ArrayList()); - aggregation.put(attrName + "_typeAut0_G3n3r47eD_", new ArrayList()); + aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, new ArrayList()); } // for setAggregation(aggregation); } // initialize @@ -127,7 +127,7 @@ public void aggregate(NGSIEvent event) { if (aggregation.containsKey(attrName)) { aggregation.get(attrName).add(attrValue); aggregation.get(attrName + "_md").add(jsonAttrMetadata); - aggregation.get(attrName + "_typeAut0_G3n3r47eD_").add(new JsonPrimitive(attrType)); + aggregation.get(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).add(new JsonPrimitive(attrType)); } else { ArrayList values = new ArrayList(Collections.nCopies(numPreviousValues, null)); values.add(attrValue); @@ -137,7 +137,7 @@ public void aggregate(NGSIEvent event) { aggregation.put(attrName + "_md", valuesMd); ArrayList valuesType = new ArrayList(Collections.nCopies(numPreviousValues, null)); valuesType.add(new JsonPrimitive(attrType)); - aggregation.put(attrName + "_typeAut0_G3n3r47eD_", valuesType); + aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, valuesType); } // if else } // for // Iterate on all the aggregations, checking for not updated attributes; add an empty value if missing diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java index 1fb50925d..25ae54bfa 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java @@ -683,7 +683,7 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA JsonElement metadata = genericAggregator.getAggregation().get(attributeName + "_md").get(i); if (genericAggregator.isAttrMetadataStore() && metadata != null && !metadata.toString().isEmpty() && !metadata.toString().contains("[]")) { String attrMdFileName = buildAttrMdFilePath(genericAggregator.getService(), genericAggregator.getServicePathForNaming(), genericAggregator.getEntityForNaming(), attributeName, - genericAggregator.getAggregation().get(attributeName + "_typeAut0_G3n3r47eD_").get(i).toString()); + genericAggregator.getAggregation().get(attributeName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).get(i).toString()); String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName; line += csvSeparator + printableAttrMdFileName; if (metadata.isJsonPrimitive()) { diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java index a92669f14..77e340870 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java @@ -51,6 +51,8 @@ private NGSIConstants() { public static final String ATTR_VALUE = "attrValue"; public static final String ATTR_MD = "attrMd"; public static final String ATTR_MD_FILE = "attrMdFile"; + + public static final String AUTOGENERATED_ATTR_TYPE = "_attr_internal_type"; // FIWARE service and FIWARE service path specific constants public static final int SERVICE_HEADER_MAX_LEN = 50; diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java index 4492c81f5..c7acdea76 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java @@ -376,7 +376,7 @@ public static LinkedHashMap> linkedHashMapWithout Iterator it = aggregation.keySet().iterator(); while (it.hasNext()) { String entry = (String) it.next(); - if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains("_typeAut0_G3n3r47eD_")) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { + if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { keysToCrop.add(entry); } } @@ -400,7 +400,7 @@ public static ArrayList attributeNames(LinkedHashMap cropedList = (ArrayList) attributeNames.clone(); for (String key : cropedList) { - if (key.contains("_md") || key.contains("_typeAut0_G3n3r47eD_")) { + if (key.contains("_md") || key.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) { attributeNames.remove(key); } }