Skip to content

Commit

Permalink
handle custom request params at object init time rather than for each…
Browse files Browse the repository at this point in the history
… request
  • Loading branch information
iboates committed Jun 22, 2024
1 parent 9e72ce4 commit dda1e1e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 50 deletions.
120 changes: 70 additions & 50 deletions geo/Geoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,50 @@ def __init__(
service_url: str = "http://localhost:8080/geoserver", # default deployment url during installation
username: str = "admin", # default username during geoserver installation
password: str = "geoserver", # default password during geoserver installation
request_options: Dict[Any, str] = None # additional parameters to be sent with each request
):
self.service_url = service_url
self.username = username
self.password = password
self.request_options = request_options if request_options is not None else {}

# private request method to reduce repetition of putting auth(username,password) in all requests call. DRY principle

def _requests(self, method: str, url: str, **kwargs) -> requests.Response:
if method == "post":
return requests.post(url, auth=(self.username, self.password), **kwargs)
elif method == "get":
return requests.get(url, auth=(self.username, self.password), **kwargs)
elif method == "put":
return requests.put(url, auth=(self.username, self.password), **kwargs)
elif method == "delete":
return requests.delete(url, auth=(self.username, self.password), **kwargs)
def _requests(self,
method: str,
url: str,
request_options: Dict[str, Any] = None,
**kwargs) -> requests.Response:

"""
Convenience wrapper to the requests library which automatically handles the authentication, as well as additonal
options to be passed to each request.
Attributes
----------
method : str
Which method to use (`get`, `post`, `put`, `delete`
url : str
URL to which to make the request
request_options: Dict[str, Any]
Dictionary containing key/value pairs for options to be sent with the request, which will override the ones
specified at initialization time of the Geoserver object
"""

# Some methods were historically hardcoded with specific options
if request_options is not None:
overridden_request_options = {**self.request_options, **request_options}
else:
overridden_request_options = {}

if method.lower() == "post":
return requests.post(url, auth=(self.username, self.password), **overridden_request_options, **kwargs)
elif method.lower() == "get":
return requests.get(url, auth=(self.username, self.password), **overridden_request_options, **kwargs)
elif method.lower() == "put":
return requests.put(url, auth=(self.username, self.password), **overridden_request_options, **kwargs)
elif method.lower() == "delete":
return requests.delete(url, auth=(self.username, self.password), **overridden_request_options, **kwargs)

# _______________________________________________________________________________________________
#
Expand All @@ -93,10 +121,9 @@ def _requests(self, method: str, url: str, **kwargs) -> requests.Response:
def get_manifest(self):
"""
Returns the manifest of the geoserver. The manifest is a JSON of all the loaded JARs on the GeoServer server.
"""
url = "{}/rest/about/manifest.json".format(self.service_url)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
Expand All @@ -118,7 +145,7 @@ def get_status(self):
Returns the status of the geoserver. It shows the status details of all installed and configured modules.
"""
url = "{}/rest/about/status.json".format(self.service_url)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
Expand All @@ -129,7 +156,7 @@ def get_system_status(self):
It returns the system status of the geoserver. It returns a list of system-level information. Major operating systems (Linux, Windows and MacOX) are supported out of the box.
"""
url = "{}/rest/about/system-status.json".format(self.service_url)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", url)
if r.status_code == 200:
return r.json()
else:
Expand All @@ -144,7 +171,7 @@ def reload(self):
curl -X POST http://localhost:8080/geoserver/rest/reload -H "accept: application/json" -H "content-type: application/json"
"""
url = "{}/rest/reload".format(self.service_url)
r = requests.post(url, auth=(self.username, self.password))
r = self._requests("post", url)
if r.status_code == 200:
return "Status code: {}".format(r.status_code)
else:
Expand All @@ -159,7 +186,7 @@ def reset(self):
curl -X POST http://localhost:8080/geoserver/rest/reset -H "accept: application/json" -H "content-type: application/json"
"""
url = "{}/rest/reset".format(self.service_url)
r = requests.post(url, auth=(self.username, self.password))
r = self._requests("post", url)
if r.status_code == 200:
return "Status code: {}".format(r.status_code)
else:
Expand All @@ -171,67 +198,60 @@ def reset(self):
# _______________________________________________________________________________________________
#

def get_default_workspace(self, request_options: Dict[str, Any] = None):
def get_default_workspace(self):
"""
Returns the default workspace.
"""

request_options = _parse_request_options(request_options)
url = "{}/rest/workspaces/default".format(self.service_url)
r = self._requests("get", url, **request_options)
r = self._requests("get", url)

if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)

def get_workspace(self, workspace, request_options: Dict[str, Any] = None):
def get_workspace(self, workspace):
"""
get name workspace if exist
Example: curl -v -u admin:admin -XGET -H "Accept: text/xml" http://localhost:8080/geoserver/rest/workspaces/acme.xml
"""
request_options = _parse_request_options(request_options)

# FIXME: params["recurse"]="true" was hardcoded historically, but it should be configurable now. Could be breaking change though
params = {**request_options.get("params", {}), "recurse": "true"}
request_options = {**request_options, "params": params}
request_options = {"params": {"recurse": "true"}}

url = "{}/rest/workspaces/{}.json".format(self.service_url, workspace)
r = self._requests("get", url, **request_options)
r = self._requests("get", url, request_options)

if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)

def get_workspaces(self, request_options: Dict[str, Any] = None):
def get_workspaces(self):
"""
Returns all the workspaces.
"""
request_options = _parse_request_options(request_options)
url = "{}/rest/workspaces".format(self.service_url)

r = self._requests("get", url, **request_options)
r = self._requests("get", url)

if r.status_code == 200:
return r.json()
else:
raise GeoserverException(r.status_code, r.content)

def set_default_workspace(self, workspace: str, request_options: Dict[str, Any] = None):
def set_default_workspace(self, workspace: str):
"""
Set the default workspace.
"""
url = "{}/rest/workspaces/default".format(self.service_url)
data = "<workspace><name>{}</name></workspace>".format(workspace)
request_options = _parse_request_options(request_options)

r = self._requests(
"put",
url,
data=data,
headers={"content-type": "text/xml"},
**request_options
headers={"content-type": "text/xml"}
)

if r.status_code == 200:
Expand All @@ -241,7 +261,7 @@ def set_default_workspace(self, workspace: str, request_options: Dict[str, Any]
else:
raise GeoserverException(r.status_code, r.content)

def create_workspace(self, workspace: str, **kwargs):
def create_workspace(self, workspace: str):
"""
Create a new workspace in geoserver.
Expand All @@ -250,14 +270,14 @@ def create_workspace(self, workspace: str, **kwargs):
url = "{}/rest/workspaces".format(self.service_url)
data = "<workspace><name>{}</name></workspace>".format(workspace)
headers = {"content-type": "text/xml"}
r = self._requests("post", url, data=data, headers=headers, **kwargs)
r = self._requests("post", url, data=data, headers=headers)

if r.status_code == 201:
return "{} Workspace {} created!".format(r.status_code, workspace)
else:
raise GeoserverException(r.status_code, r.content)

def delete_workspace(self, workspace: str, **kwargs):
def delete_workspace(self, workspace: str):
"""
Parameters
Expand All @@ -267,7 +287,7 @@ def delete_workspace(self, workspace: str, **kwargs):
"""
payload = {"recurse": "true"}
url = "{}/rest/workspaces/{}".format(self.service_url, workspace)
r = requests.delete(url, auth=(self.username, self.password), params=payload, **kwargs)
r = self._requests("delete", auth=(self.username, self.password), params=payload)

if r.status_code == 200:
return "Status code: {}, delete workspace".format(r.status_code)
Expand Down Expand Up @@ -315,7 +335,7 @@ def get_datastores(self, workspace: Optional[str] = None):
url = "{}/rest/workspaces/{}/datastores.json".format(
self.service_url, workspace
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
return r.json()
else:
Expand Down Expand Up @@ -355,7 +375,7 @@ def get_coveragestores(self, workspace: str = None):
workspace = "default"

url = "{}/rest/workspaces/{}/coveragestores".format(self.service_url, workspace)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
return r.json()
else:
Expand Down Expand Up @@ -505,7 +525,7 @@ def get_layers(self, workspace: Optional[str] = None):

if workspace is not None:
url = "{}/rest/workspaces/{}/layers".format(self.service_url, workspace)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
return r.json()
else:
Expand Down Expand Up @@ -553,7 +573,7 @@ def get_layergroups(self, workspace: Optional[str] = None):
url = "{}/rest/workspaces/{}/layergroups".format(
self.service_url, workspace
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
return r.json()
else:
Expand Down Expand Up @@ -1124,7 +1144,7 @@ def get_styles(self, workspace: Optional[str] = None):
url = "{}/rest/workspaces/{}/styles.json".format(
self.service_url, workspace
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
return r.json()
else:
Expand Down Expand Up @@ -2176,7 +2196,7 @@ def get_featuretypes(self, workspace: str = None, store_name: str = None):
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes.json".format(
self.service_url, workspace, store_name
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
r_dict = r.json()
features = [i["name"] for i in r_dict["featureTypes"]["featureType"]]
Expand All @@ -2199,7 +2219,7 @@ def get_feature_attribute(
url = "{}/rest/workspaces/{}/datastores/{}/featuretypes/{}.json".format(
self.service_url, workspace, store_name, feature_type_name
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
r_dict = r.json()
attribute = [
Expand All @@ -2221,7 +2241,7 @@ def get_featurestore(self, store_name: str, workspace: str):
url = "{}/rest/workspaces/{}/datastores/{}".format(
self.service_url, workspace, store_name
)
r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))
if r.status_code == 200:
r_dict = r.json()
return r_dict["dataStore"]
Expand All @@ -2245,7 +2265,7 @@ def delete_featurestore(
)
if workspace is None:
url = "{}/datastores/{}".format(self.service_url, featurestore_name)
r = requests.delete(url, auth=(self.username, self.password), params=payload)
r = self._requests("delete", auth=(self.username, self.password), params=payload)

if r.status_code == 200:
return "Status code: {}, delete featurestore".format(r.status_code)
Expand Down Expand Up @@ -2273,7 +2293,7 @@ def delete_coveragestore(
self.service_url, coveragestore_name
)

r = requests.delete(url, auth=(self.username, self.password), params=payload)
r = self._requests("delete", auth=(self.username, self.password), params=payload)

if r.status_code == 200:
return "Coverage store deleted successfully"
Expand Down Expand Up @@ -2302,7 +2322,7 @@ def get_all_users(self, service=None):
url += "service/{}/users/".format(service)

headers = {"accept": "application/xml"}
r = requests.get(url, auth=(self.username, self.password), headers=headers)
r = self._requests("get", auth=(self.username, self.password), headers=headers)

if r.status_code == 200:
return parse(r.content)
Expand Down Expand Up @@ -2403,7 +2423,7 @@ def delete_user(self, username: str, service=None):
url += "service/{}/user/{}".format(service, username)

headers = {"accept": "application/json"}
r = requests.delete(url, auth=(self.username, self.password), headers=headers)
r = self._requests("delete", auth=(self.username, self.password), headers=headers)

if r.status_code == 200:
return "User deleted successfully"
Expand All @@ -2426,7 +2446,7 @@ def get_all_usergroups(self, service=None):
else:
url += "service/{}/groups/".format(service)

r = requests.get(url, auth=(self.username, self.password))
r = self._requests("get", auth=(self.username, self.password))

if r.status_code == 200:
return parse(r.content)
Expand All @@ -2449,7 +2469,7 @@ def create_usergroup(self, group: str, service=None):
url += "group/{}".format(group)
else:
url += "service/{}/group/{}".format(service, group)
r = requests.post(url, auth=(self.username, self.password))
r = self._requests("post", auth=(self.username, self.password))

if r.status_code == 201:
return "Group created successfully"
Expand All @@ -2473,7 +2493,7 @@ def delete_usergroup(self, group: str, service=None):
else:
url += "service/{}/group/{}".format(service, group)

r = requests.delete(url, auth=(self.username, self.password))
r = self._requests("delete", auth=(self.username, self.password))

if r.status_code == 200:
return "Group deleted successfully"
Expand Down
1 change: 1 addition & 0 deletions tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ services:
environment:
- GEOSERVER_ADMIN_PASSWORD=geoserver
- GEOSERVER_ADMIN_USER=admin
- SAMPLE_DATA=true
ports:
- "0.0.0.0:8080:8080"
Loading

0 comments on commit dda1e1e

Please sign in to comment.