tsunami-security-scanner/plugin_server/py/plugin/payload/payload_generator_test.py

388 lines
15 KiB
Python

"""Tests for google3.third_party.java_src.tsunami.plugin_server.py.plugin.payload.payload_generator."""
import hashlib
import unittest
from absl.testing import absltest
from absl.testing import parameterized
import requests_mock
from google.protobuf import wrappers_pb2
from common.net.http.requests_http_client import RequestsHttpClientBuilder
from plugin.payload.payload_generator import PayloadGenerator
from plugin.payload.payload_generator_test_helper import ANY_SSRF_CONFIG
from plugin.payload.payload_generator_test_helper import JAVA_REFLECTIVE_RCE_CONFIG
from plugin.payload.payload_generator_test_helper import LINUX_REFLECTIVE_RCE_CONFIG
from plugin.payload.payload_generator_test_helper import LINUX_UNSPECIFIED_CONFIG
from plugin.payload.payload_secret_generator import PayloadSecretGenerator
from plugin.payload.payload_utility import get_parsed_payload
from plugin.tcs_client import TcsClient
import payload_generator_pb2 as pg
_IP_ADDRESS = '127.0.0.1'
_PORT = 8000
_URL = 'http://valid.com'
_SECRET = '_1234_HERE_IT_IS_'
class PayloadGeneratorWithCallbackTest(parameterized.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
psg = PayloadSecretGenerator()
psg.generate = unittest.mock.MagicMock(return_value=_SECRET)
client = TcsClient(
_IP_ADDRESS, _PORT, _URL, RequestsHttpClientBuilder().build()
)
payloads = get_parsed_payload()
cls.payload_generator = PayloadGenerator(psg, payloads, client)
def test_is_callback_server_enabled_returns_true(self):
self.assertTrue(self.payload_generator.is_callback_server_enabled())
@parameterized.named_parameters(
('linux_config', LINUX_REFLECTIVE_RCE_CONFIG, 'curl'),
(
'ssrf_config',
ANY_SSRF_CONFIG,
hashlib.sha3_224(_SECRET.encode('utf-8')).hexdigest(),
),
)
def test_generate_with_callback_returns_payload(
self, config, expected_payload
):
payload = self.payload_generator.generate(config)
self.assertIn(expected_payload, payload.payload)
self.assertIn(_IP_ADDRESS, payload.payload)
self.assertIn(str(_PORT), payload.payload)
self.assertTrue(payload.get_payload_attributes().uses_callback_server)
@parameterized.named_parameters(
(
'linux_config',
LINUX_REFLECTIVE_RCE_CONFIG,
(
'printf %s%s%s'
' TSUNAMI_PAYLOAD_START {secret} TSUNAMI_PAYLOAD_END'
).format(secret=_SECRET),
),
(
'ssrf_config',
ANY_SSRF_CONFIG,
'http://public-firing-range.appspot.com/',
),
(
'java_config',
JAVA_REFLECTIVE_RCE_CONFIG,
(
'String.format("%s%s%s", "TSUNAMI_PAYLOAD_START",'
' "{secret}", "TSUNAMI_PAYLOAD_END")'
).format(secret=_SECRET),
),
)
def test_generate_no_callback_returns_payload(
self, config, expected_payload
):
payload = self.payload_generator.generate_no_callback(config)
self.assertIn(expected_payload, payload.payload)
self.assertFalse(payload.get_payload_attributes().uses_callback_server)
@requests_mock.mock()
def test_check_if_executed_linux_and_payload_exec_returns_true(self, mock):
body = '{ "has_dns_interaction":false, "has_http_interaction":true}'
mock.register_uri('GET', _URL, content=body.encode('utf-8'))
payload = self.payload_generator.generate(LINUX_REFLECTIVE_RCE_CONFIG)
self.assertTrue(payload.check_if_executed())
@requests_mock.mock()
def test_check_if_executed_linux_and_payload_exec_with_dns_returns_true(
self, mock):
body = '{ "has_dns_interaction":true, "has_http_interaction":true}'
mock.register_uri('GET', _URL, content=body.encode('utf-8'))
payload = self.payload_generator.generate(LINUX_REFLECTIVE_RCE_CONFIG)
self.assertTrue(payload.check_if_executed())
@requests_mock.mock()
def test_check_if_executed_linux_no_payload_exec_returns_false(self, mock):
body = '{ "has_dns_interaction":false, "has_http_interaction":false}'
mock.register_uri('GET', _URL, content=body.encode('utf-8'))
payload = self.payload_generator.generate(LINUX_REFLECTIVE_RCE_CONFIG)
self.assertFalse(payload.check_if_executed())
@requests_mock.mock()
def test_check_if_executed_ssrf_and_payload_exec_returns_true(self, mock):
body = '{ "has_dns_interaction":true, "has_http_interaction":false}'
mock.register_uri('GET', _URL, content=body.encode('utf-8'))
payload = self.payload_generator.generate(ANY_SSRF_CONFIG)
self.assertTrue(payload.check_if_executed())
@requests_mock.mock()
def test_check_if_executed_ssrf_and_no_payload_exec_returns_false(self, mock):
body = '{ "has_dns_interaction":false, "has_http_interaction":false}'
mock.register_uri('GET', _URL, content=body.encode('utf-8'))
payload = self.payload_generator.generate(ANY_SSRF_CONFIG)
self.assertFalse(payload.check_if_executed())
def test_generate_with_no_vulnerability_type_raises_lookup_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator.generate(
pg.PayloadGeneratorConfig(
interpretation_environment=pg.PayloadGeneratorConfig.InterpretationEnvironment.INTERPRETATION_ANY,
execution_environment=pg.PayloadGeneratorConfig.ExecutionEnvironment.EXEC_INTERPRETATION_ENVIRONMENT,
)
)
self.assertEqual(
(
'No payload implemented for VULNERABILITY_TYPE_UNSPECIFIED'
' vulnerability type, INTERPRETATION_ANY interpretation'
' environment, and EXEC_INTERPRETATION_ENVIRONMENT execution'
' environment.'
),
str(exc.exception),
)
def test_generate_with_no_interpretation_environment_raises_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator.generate(
pg.PayloadGeneratorConfig(
vulnerability_type=pg.PayloadGeneratorConfig.VulnerabilityType.REFLECTIVE_RCE,
execution_environment=pg.PayloadGeneratorConfig.ExecutionEnvironment.EXEC_INTERPRETATION_ENVIRONMENT,
)
)
self.assertEqual(
(
'No payload implemented for REFLECTIVE_RCE vulnerability type,'
' INTERPRETATION_ENVIRONMENT_UNSPECIFIED interpretation'
' environment, and EXEC_INTERPRETATION_ENVIRONMENT execution'
' environment.'
),
str(exc.exception),
)
def test_generate_with_no_execution_environment_raises_lookup_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator.generate(
pg.PayloadGeneratorConfig(
interpretation_environment=pg.PayloadGeneratorConfig.InterpretationEnvironment.INTERPRETATION_ANY,
vulnerability_type=pg.PayloadGeneratorConfig.VulnerabilityType.REFLECTIVE_RCE,
)
)
self.assertEqual(
(
'No payload implemented for REFLECTIVE_RCE vulnerability type,'
' INTERPRETATION_ANY interpretation environment, and'
' EXECUTION_ENVIRONMENT_UNSPECIFIED execution environment.'
),
str(exc.exception),
)
def test_generate_with_no_config_raises_lookup_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator.generate(pg.PayloadGeneratorConfig())
self.assertEqual(
(
'No payload implemented for VULNERABILITY_TYPE_UNSPECIFIED'
' vulnerability type, INTERPRETATION_ENVIRONMENT_UNSPECIFIED'
' interpretation environment, and EXECUTION_ENVIRONMENT_UNSPECIFIED'
' execution environment.'
),
str(exc.exception),
)
class PayloadGeneratorWithoutCallbackTest(parameterized.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
psg = PayloadSecretGenerator()
psg.generate = unittest.mock.MagicMock(return_value=_SECRET)
payloads = get_parsed_payload()
disabled_client = TcsClient('', 0, '', RequestsHttpClientBuilder().build())
cls.payload_generator_without_callback_server = PayloadGenerator(
psg, payloads, disabled_client
)
def test_is_callback_server_enabled_with_no_callback_returns_false(self):
self.assertFalse(
self.payload_generator_without_callback_server.is_callback_server_enabled()
)
@parameterized.named_parameters(
(
'linux_config',
LINUX_REFLECTIVE_RCE_CONFIG,
(
'printf %s%s%s TSUNAMI_'
'PAYLOAD_START {secret} TSUNAMI_PAYLOAD_END'
).format(secret=_SECRET),
),
(
'ssrf_config',
ANY_SSRF_CONFIG,
'http://public-firing-range.appspot.com/',
),
(
'java_config',
JAVA_REFLECTIVE_RCE_CONFIG,
(
'String.format("%s%s%s", "TSUNAMI_PAYLOAD_START",'
' "{secret}", "TSUNAMI_PAYLOAD_END")'
).format(secret=_SECRET),
),
)
def test_generate_without_callback_returns_payload(
self, config, expected_payload
):
payload = self.payload_generator_without_callback_server.generate(config)
self.assertEqual(expected_payload, payload.payload)
self.assertFalse(payload.get_payload_attributes().uses_callback_server)
@parameterized.named_parameters(
(
'linux_config',
LINUX_REFLECTIVE_RCE_CONFIG,
('TSUNAMI_PAYLOAD_START{secret}TSUNAMI_PAYLOAD_END').format(
secret=_SECRET
),
),
('ssrf_config', ANY_SSRF_CONFIG, '<h1>What is the Firing Range?</h1>'),
(
'java_config',
JAVA_REFLECTIVE_RCE_CONFIG,
('TSUNAMI_PAYLOAD_START{secret}TSUNAMI_PAYLOAD_END').format(
secret=_SECRET
),
),
)
def test_check_if_executed_with_correct_payload_input_returns_true(
self, config, expected_payload
):
payload = self.payload_generator_without_callback_server.generate(config)
payload_byte = bytes(expected_payload, 'utf-8')
self.assertTrue(payload.check_if_executed(payload_byte))
@parameterized.named_parameters(
('linux_config', LINUX_REFLECTIVE_RCE_CONFIG, 'Random input'),
('ssrf_config', ANY_SSRF_CONFIG, '<h1>Not Firing Range</h1>'),
(
'java_config',
JAVA_REFLECTIVE_RCE_CONFIG,
'TSUNAMI_PAYLOAD_START_Nothing_here_TSUNAMI_PAYLOAD_END',
),
)
def test_check_if_executed_with_bad_payload_input_returns_false(
self, config, expected_payload
):
payload = self.payload_generator_without_callback_server.generate(config)
payload_byte = bytes(expected_payload, 'utf-8')
self.assertFalse(payload.check_if_executed(payload_byte))
def test_generate_with_no_callback_no_vulnerability_type_raises_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator_without_callback_server.generate(
pg.PayloadGeneratorConfig(
interpretation_environment=pg.PayloadGeneratorConfig.InterpretationEnvironment.INTERPRETATION_ANY,
execution_environment=pg.PayloadGeneratorConfig.ExecutionEnvironment.EXEC_INTERPRETATION_ENVIRONMENT,
)
)
self.assertEqual(
(
'No payload implemented for VULNERABILITY_TYPE_UNSPECIFIED'
' vulnerability type, INTERPRETATION_ANY interpretation'
' environment, and EXEC_INTERPRETATION_ENVIRONMENT execution'
' environment.'
),
str(exc.exception),
)
def test_generate_with_no_callback_no_interpretation_env_raises_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator_without_callback_server.generate(
pg.PayloadGeneratorConfig(
vulnerability_type=pg.PayloadGeneratorConfig.VulnerabilityType.REFLECTIVE_RCE,
execution_environment=pg.PayloadGeneratorConfig.ExecutionEnvironment.EXEC_INTERPRETATION_ENVIRONMENT,
)
)
self.assertEqual(
(
'No payload implemented for REFLECTIVE_RCE vulnerability type,'
' INTERPRETATION_ENVIRONMENT_UNSPECIFIED interpretation'
' environment, and EXEC_INTERPRETATION_ENVIRONMENT execution'
' environment.'
),
str(exc.exception),
)
def test_generate_with_no_callback_and_no_execution_env_raises_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator_without_callback_server.generate(
pg.PayloadGeneratorConfig(
interpretation_environment=pg.PayloadGeneratorConfig.InterpretationEnvironment.INTERPRETATION_ANY,
vulnerability_type=pg.PayloadGeneratorConfig.VulnerabilityType.REFLECTIVE_RCE,
)
)
self.assertEqual(
(
'No payload implemented for REFLECTIVE_RCE vulnerability type,'
' INTERPRETATION_ANY interpretation environment, and'
' EXECUTION_ENVIRONMENT_UNSPECIFIED execution environment.'
),
str(exc.exception),
)
def test_generate_with_no_callback_and_no_config_raises_lookup_error(self):
with self.assertRaises(LookupError) as exc:
self.payload_generator_without_callback_server.generate(
pg.PayloadGeneratorConfig()
)
self.assertEqual(
(
'No payload implemented for VULNERABILITY_TYPE_UNSPECIFIED'
' vulnerability type, INTERPRETATION_ENVIRONMENT_UNSPECIFIED'
' interpretation environment, and EXECUTION_ENVIRONMENT_UNSPECIFIED'
' execution environment.'
),
str(exc.exception),
)
def test_generate_with_no_callback_and_no_validation_type_raises_error(self):
payload_list = [pg.PayloadDefinition(
interpretation_environment=pg.PayloadGeneratorConfig.InterpretationEnvironment.LINUX_SHELL,
name=wrappers_pb2.StringValue(value='linux_printf'),
execution_environment=pg.PayloadGeneratorConfig.ExecutionEnvironment.EXEC_INTERPRETATION_ENVIRONMENT,
vulnerability_type=[pg.PayloadGeneratorConfig.VulnerabilityType.VULNERABILITY_TYPE_UNSPECIFIED],
uses_callback_server=wrappers_pb2.BoolValue(value=False),
payload_string=wrappers_pb2.StringValue(value='printf %s%s%s TSUNAMI_'
'PAYLOAD_START $TSUNAMI_PAYLOAD'
'_TOKEN_RANDOM TSUNAMI_PAYLOAD_'
'END'),
)]
psg = PayloadSecretGenerator()
psg.generate = unittest.mock.MagicMock(return_value=_SECRET)
payload_generator_no_callback = PayloadGenerator(
psg,
payload_list,
TcsClient('', 0, '', RequestsHttpClientBuilder().build()),
)
with self.assertRaises(NotImplementedError) as exc:
payload_generator_no_callback.generate(LINUX_UNSPECIFIED_CONFIG)
self.assertEqual(
'Validation type VULNERABILITY_TYPE_UNSPECIFIED not supported.',
str(exc.exception),
)
def test_check_if_executed_with_no_callback_and_no_payload_raises_error(self):
payload = self.payload_generator_without_callback_server.generate(
LINUX_REFLECTIVE_RCE_CONFIG
)
with self.assertRaises(ValueError) as exc:
payload.check_if_executed()
self.assertEqual('No valid payload input is entered.', str(exc.exception))
if __name__ == '__main__':
absltest.main()