Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/attribute type key (workaround) #1920

Merged
merged 4 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[cygnus-ngsi][ColumnAggregator] Fix attribute type key to make it unique. (#1904)
[cygnus-ngsi][PostgisSink, PostgreSQLSink] Implement records expiration for Postgis and PostgreSQL sinks (#1915)
[cygnus-ngsi][ColumnAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902)
[cygnus-ngsi][CKANSink] Fix bug about bad service name used by CKAN orgName with a name mapping (#1910)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void initialize(NGSIEvent event) {
String attrName = contextAttribute.getName();
aggregation.put(attrName, new ArrayList<JsonElement>());
aggregation.put(attrName + "_md", new ArrayList<JsonElement>());
aggregation.put(attrName + "_type", new ArrayList<JsonElement>());
aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, new ArrayList<JsonElement>());
} // for
setAggregation(aggregation);
} // initialize
Expand Down Expand Up @@ -127,7 +127,7 @@ public void aggregate(NGSIEvent event) {
if (aggregation.containsKey(attrName)) {
aggregation.get(attrName).add(attrValue);
aggregation.get(attrName + "_md").add(jsonAttrMetadata);
aggregation.get(attrName + "_type").add(new JsonPrimitive(attrType));
aggregation.get(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).add(new JsonPrimitive(attrType));
} else {
ArrayList<JsonElement> values = new ArrayList<JsonElement>(Collections.nCopies(numPreviousValues, null));
values.add(attrValue);
Expand All @@ -137,7 +137,7 @@ public void aggregate(NGSIEvent event) {
aggregation.put(attrName + "_md", valuesMd);
ArrayList<JsonElement> valuesType = new ArrayList<JsonElement>(Collections.nCopies(numPreviousValues, null));
valuesType.add(new JsonPrimitive(attrType));
aggregation.put(attrName + "_type", valuesType);
aggregation.put(attrName + NGSIConstants.AUTOGENERATED_ATTR_TYPE, valuesType);
} // if else
} // for
// Iterate on all the aggregations, checking for not updated attributes; add an empty value if missing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA
JsonElement metadata = genericAggregator.getAggregation().get(attributeName + "_md").get(i);
if (genericAggregator.isAttrMetadataStore() && metadata != null && !metadata.toString().isEmpty() && !metadata.toString().contains("[]")) {
String attrMdFileName = buildAttrMdFilePath(genericAggregator.getService(), genericAggregator.getServicePathForNaming(), genericAggregator.getEntityForNaming(), attributeName,
genericAggregator.getAggregation().get(attributeName + "_type").get(i).toString());
genericAggregator.getAggregation().get(attributeName + NGSIConstants.AUTOGENERATED_ATTR_TYPE).get(i).toString());
String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName;
line += csvSeparator + printableAttrMdFileName;
if (metadata.isJsonPrimitive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private NGSIConstants() {
public static final String ATTR_VALUE = "attrValue";
public static final String ATTR_MD = "attrMd";
public static final String ATTR_MD_FILE = "attrMdFile";

public static final String AUTOGENERATED_ATTR_TYPE = "_attr_internal_type";

// FIWARE service and FIWARE service path specific constants
public static final int SERVICE_HEADER_MAX_LEN = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public static LinkedHashMap<String, ArrayList<JsonElement>> linkedHashMapWithout
Iterator<String> it = aggregation.keySet().iterator();
while (it.hasNext()) {
String entry = (String) it.next();
if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains("_type")) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) {
if ((!attrMetadataStore && (entry.contains("_md") || entry.contains("_MD") || entry.equals(NGSIConstants.ATTR_MD)) || entry.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) || (entry.equals(NGSIConstants.RECV_TIME_TS+"C"))) {
keysToCrop.add(entry);
}
}
Expand All @@ -400,7 +400,7 @@ public static ArrayList<String> attributeNames(LinkedHashMap<String, ArrayList<J
attributeNames.remove(NGSIConstants.ENTITY_TYPE);
ArrayList <String> cropedList = (ArrayList<String>) attributeNames.clone();
for (String key : cropedList) {
if (key.contains("_md") || key.contains("_type")) {
if (key.contains("_md") || key.contains(NGSIConstants.AUTOGENERATED_ATTR_TYPE)) {
attributeNames.remove(key);
}
}
Expand Down