diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index b4bfe1ff6..ceac62861 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1,3 +1,4 @@ +[cygnus-ngsi][ColumnAggregator] Fix attribute type key to make it unique. (#1904) [cygnus-ngsi][PostgisSink, PostgreSQLSink] Implement records expiration for Postgis and PostgreSQL sinks (#1915) [cygnus-ngsi][ColumnAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902) [cygnus-ngsi][CKANSink] Fix bug about bad service name used by CKAN orgName with a name mapping (#1910) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java index f2e8558a2..d286eff94 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java @@ -68,7 +68,7 @@ public void initialize(NGSIEvent event) { String attrName = contextAttribute.getName(); aggregation.put(attrName, new ArrayList()); aggregation.put(attrName + "_md", new ArrayList()); - aggregation.put(attrName + "_type", new ArrayList()); + aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, new ArrayList()); } // for setAggregation(aggregation); } // initialize @@ -127,7 +127,7 @@ public void aggregate(NGSIEvent event) { if (aggregation.containsKey(attrName)) { aggregation.get(attrName).add(attrValue); aggregation.get(attrName + "_md").add(jsonAttrMetadata); - aggregation.get(attrName + "_type").add(new JsonPrimitive(attrType)); + aggregation.get(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).add(new JsonPrimitive(attrType)); } else { ArrayList values = new ArrayList(Collections.nCopies(numPreviousValues, null)); values.add(attrValue); @@ -137,7 +137,7 @@ public void aggregate(NGSIEvent event) { aggregation.put(attrName + "_md", valuesMd); ArrayList valuesType = new ArrayList(Collections.nCopies(numPreviousValues, null)); valuesType.add(new JsonPrimitive(attrType)); - aggregation.put(attrName + "_type", valuesType); + aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, valuesType); } // if else } // for // Iterate on all the aggregations, checking for not updated attributes; add an empty value if missing diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java index bb07af8a0..25ae54bfa 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java @@ -683,7 +683,7 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA JsonElement metadata = genericAggregator.getAggregation().get(attributeName + "_md").get(i); if (genericAggregator.isAttrMetadataStore() && metadata != null && !metadata.toString().isEmpty() && !metadata.toString().contains("[]")) { String attrMdFileName = buildAttrMdFilePath(genericAggregator.getService(), genericAggregator.getServicePathForNaming(), genericAggregator.getEntityForNaming(), attributeName, - genericAggregator.getAggregation().get(attributeName + "_type").get(i).toString()); + genericAggregator.getAggregation().get(attributeName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).get(i).toString()); String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName; line += csvSeparator + printableAttrMdFileName; if (metadata.isJsonPrimitive()) { diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java index a92669f14..77e340870 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java @@ -51,6 +51,8 @@ private NGSIConstants() { public static final String ATTR_VALUE = "attrValue"; public static final String ATTR_MD = "attrMd"; public static final String ATTR_MD_FILE = "attrMdFile"; + + public static final String AUTOGENERATED_ATTR_TYPE = "_attr_internal_type"; // FIWARE service and FIWARE service path specific constants public static final int SERVICE_HEADER_MAX_LEN = 50; diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java index bed7bb5d8..c7acdea76 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIUtils.java @@ -376,7 +376,7 @@ public static LinkedHashMap> linkedHashMapWithout Iterator it = aggregation.keySet().iterator(); while (it.hasNext()) { String entry = (String) it.next(); - if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains("_type")) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { + if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) { keysToCrop.add(entry); } } @@ -400,7 +400,7 @@ public static ArrayList attributeNames(LinkedHashMap cropedList = (ArrayList) attributeNames.clone(); for (String key : cropedList) { - if (key.contains("_md") || key.contains("_type")) { + if (key.contains("_md") || key.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) { attributeNames.remove(key); } }