ESA Rule Writing Best Practices
When working with RSA Live ESA or the ESA Rule Builder, you should not need to know the EPL syntax used within the rules. However, if your use case exceeds either of these capabilities, you should become familiar with at least the basics of the EsperTech EPL language used with ESA.
Note: NetWitness Platform 11.3 uses Esper 7.1. Earlier releases use Esper 5.3.
Inefficiently written EPL rules can have a detrimental impact on how the ESA appliance functions. Therefore, it is important to write effective EPL rules. See the following topics in the Alerting Using ESA Guide for details:
- See Alerting: Best Practices for best practices while working with ESA rules.
- See Alerting: ESA Rule Types for details on the available rule types.
This document contains hints, use cases, and FAQs related to developing statements using the Esper event processing language (EPL) with NetWitness ESA.
If you use any custom Esper Java libraries, and are having issues with accessing them in the NetWitness Platform, see the following Knowledge Base article that applies to your version of NetWitness:
- Cannot Access Custom Esper Java Libraries for RSA NetWitness Platform 11.3
- Cannot Access Custom Esper Java Libraries for RSA NetWitness Platform 11.4
Pivot to Investigate > Navigate from Respond May Not Work
In ESA rules that do not select every piece of meta from the session (that is, rules that do not use select *), you may see that data privacy (if enabled) and the Pivot to Investigate > Navigate link accessed from a context tooltip in the Respond Incident Details view does not work. For details on how to fix this, see the "Update any ESA Rule that Selects Only Certain Meta Keys from the Session to Include event_source_id" section in the Alerting with ESA Correlation Rules User Guide.
Performance Hints
The following table outlines some areas where you can make performance optimizations to the most common statement syntax.
- Recommendation:
Use group by for aggregation
- Example Rule:
Successful Logins
- Esper Documentation:
Use only group by for aggregation. Avoid use of the view for std:groupwin
Documentation links:
- Esper 7.1: Basic Aggregated Query Types
- Esper 5.3: Aggregation and Grouping
- Recommendation:
Use hint with view .std:groupwin
- Example Rule:
HTTP Uploads - Office Documents Volume – High
- Esper Documentation:
If you must use the view std:groupwin for aggregation, add a Hint to reclaim groups. If you are also using a time batch window view, the time in seconds should double the time window value.
Documentation Links:
- Esper 7.1: Grouped Data Window
- Esper 5.3: Grouped Data Window
- Recommendation:
Avoid the creation of many grouping windows
- Example Rule:
Excessive upload to File Cloud Services – High
- Esper Documentation:
Avoid creating many grouping windows (e.g., hundreds of thousands) over a long period if many unique values are expected.
- Recommendation:
statements- Avoid
- Example Rule:
-
- Esper Documentation:
Avoid alerts or general filter conditions that could cause hundreds of thousands of events to be added to a window. You could use the filter on the Investigation over the proposed time window to gauge the level of events that may match.
For health and wellness type alerts, see: Health and Wellness Alarms View and Health and Wellness Monitoring View.
- Recommendation:
Write strict MATCH RECOGNIZE patterns
- Example Rule:
Account Created And Deleted In Short Period Of Time
- Esper Documentation:
Avoid using quantifiers of + or * within MATCH RECOGNIZE statements if possible. The system will need to track fewer states. Consider writing a PATTERN-based statement if you need to use a loose pattern based on requirements.
Documentation Links:
- Esper 7.1: Operator Precedence
- Esper 5.3: Operator Precedence
- Recommendation:
Suppress Duplicate Matches
- Example Rule:
Detecting Dyreza infections
- Esper Documentation:
Use every-distinct for pattern matches or @SuppressOverlappingMatches annotation.
- Esper 7.1: Every-Distinct
- Esper 5.3: Every-Distinct
- Recommendation:
Use output rate limiting;
- Example Rule:
Sources Communicating With Known Bad Hosts – High
- Esper Documentation:
Use output rate limiting such as OUTPUT FIRST EVERY X MINUTES to suppress generation of alerts if use case allows. This saves database space.
- Esper 7.1: Output Reference and Samples
- Esper 5.3: Output Reference and Samples
Keep in mind that .win:time() behaves differently than .win:time_length_batch Events are not removed from the window after logic is triggered and may create additional alerts with the same events.
There are different ways to control generation of alerts within ESA. ‘output first every x minutes’ only alerts on the first set of events that match the statement. Subsequent alerts during that suppression timeframe will not be stored.
- Recommendation:
Add functions such as .toLowerCase to meta keys only as needed
- Example Rule:
- Esper Documentation:
Using functions adds processing overhead: use them only when needed. For example, you do not need case-insensitive matching for meta keys storing IP addresses. Also, there are keys such as device.type with known, static case, where using a function would not produce different results.
For more performance tips, see the following Esper Reference documentation:
- Esper 7.1: Performance Tips
- Esper 5.3: Performance Tips
Use group by for aggregation: Successful Logins
Consider the following statements: one using groupwin, and the other without groupwin.
Using view groupwin:
- Column 1:
@RSAAlert
SELECT * FROM
Event(
medium = 32
AND
ec_activity='Logon'
AND
ec_outcome='Success'
AND
ip_dst IS NOT NULL
AND
user_dst is NOT NULL
).std:groupwin(user_dst)
.win:time_batch(60 seconds)
group by user_dst
having count(distinct ip_dst) >= 2;
Without view groupwin:
- Column 1:
@RSAAlert
SELECT * FROM
Event(
medium = 32
AND
ec_activity='Logon'
AND
ec_outcome='Success'
AND
ip_dst IS NOT NULL
AND
user_dst is NOT NULL
).win:time_batch(60 seconds)
group by user_dst
having count(distinct ip_dst) >= 2;
The second statement (without groupwin) works the same as the first, but uses less memory.
Use hint with view .std:groupwin: HTTP Uploads - Office Documents Volume - High
The @Hint("reclaim_group_aged=age_in_seconds") hint instructs the engine to discard an aggregation state that has not been updated for age_in_seconds seconds. The age_in_seconds value should match the time window added to the statement.
- Column 1:
@Hint('reclaim_group_aged=120')
SELECT * FROM Event( medium = 32
AND (msg_id.toLowerCase() = 'post')
AND user_dst IS NOT NULL
AND extension IN ('pdf','doc','xls','docx','xslx','ppt','pptx') AND rbytes > 10000
AND device_type='cacheflowelff'
AND alert IN ('VIP', 'CritAsset')
)
.win:time_batch(60 seconds)
GROUP BY user_dst HAVING COUNT(*) = 20;
Write strict MATCH RECOGNIZE patterns: Account Created And Deleted In Short Period Of Time
The pattern defined for the match recognize statement should be strict. This means you should try to eliminate repetition operators such as + or *.
From:
- Column 1:
SELECT * FROM Event(
medium = 32 AND
ec_subject='User' AND
ec_outcome='Success' AND
user_src is NOT NULL AND
( ec_activity='Create' OR ec_activity='Delete' )).win:time(7200 seconds) match_recognize
( partition by
user_src measures C as c,
D as d pattern (C+ D)
define C as C.ec_activity='Create' ,
D as D.ec_activity='Delete');
To:
- Column 1:
SELECT * FROM Event(
medium = 32 AND
ec_subject='User' AND
ec_outcome='Success' AND
user_src is NOT NULL AND
( ec_activity='Create' OR ec_activity='Delete' )).win:time(7200 seconds) match_recognize
( partition by
user_src measures C as c,
D as d pattern (C D)
define C as C.ec_activity='Create' ,
D as D.ec_activity='Delete');
Suppress Duplicate Matches: Administrative Activity followed by User Creation
In the following statement, a new thread is created for every a event. This means multiple a events will match with the same b event. This could result in an unexpected and undesirable number of alerts for the same user during the time window.
- Column 1:
SELECT * FROM PATTERN [ every a =
Event(device_class='Web Logs'AND host_dst = 'icanhazip.com')
-> b = Event(category LIKE '%Botnet%' AND device_class='Web Logs'
AND user_dst=a.user_dst) where timer:within(300 seconds)
Instead, we recommend using the hint @SuppressOverlappingMatches with the PATTERN syntax using every.
- Column 1:
SELECT * FROM PATTERN @SuppressOverlappingMatches [ every
a = Event(device_class='Web Logs'AND host_dst = 'icanhazip.com')
-> b = Event(category LIKE '%Botnet%' AND device_class='Web Logs'
AND user_dst=a.user_dst) where timer:within(300 seconds)
Use output rate limiting: Sources Communicating With Known Bad Hosts – High
If a rule is triggering frequently, you may only want to store the first occurrence within a time period per unique meta value. This means the alerts are not stored and taking up space in the database. This is separate from notification suppression, which can be done within the UI and does not influence alert storage.
The alerts below are suppressed per user_dst using output first every syntax. Only the first alert within the 60 minute time window will be stored in the database and alerted. Allow constituent events to be retained within the alert by using window aggregation window(*). The result without window aggregation would be only the first of the 20 events per user_dst. If you do not need to maintain all events for analysis, then use select * instead of window aggregation.
From:
- Column 1:
@Hint('reclaim_group_aged=300')
SELECT * FROM Event( medium = 32
AND alert_id LIKE 'known_bad%'
AND alert IN ('VIP' , 'CritAsset')
).std:groupwin(ip_src)
.win:time(1 Minutes)
GROUP BY ip_src HAVING COUNT(*) >= 20;
To:
- Column 1:
@Hint('reclaim_group_aged=300')
SELECT window(*) FROM Event( medium = 32
AND alert_id LIKE 'known_bad%'
AND alert IN ('VIP' , 'CritAsset')
).std:groupwin(ip_src)
.win:time(1 Minutes)
GROUP BY ip_src HAVING COUNT(*) >= 20
ouptut first every 60 minutes;
Use Cases
The solutions are explained through a series of comments within the solution boxes. Comments are surrounded by /* */.
Alert if events occur within a time interval and the absence of another event is detected
Statement Summary:
Alert after receiving 10 different IDS events from the same source within 10 minutes, but only if within those 10 minutes, we do not see a TCP RST sent from the destination IP.
This is an example of correlating packet and log data. Our F5's will do a TCP RST on the inbound web requests for unknown paths, so in this instance, we only want to be alerted when a source receives 10 unique attacks to a single destination and that destination has not responded to the web requests.
Solution:
- Column 1:
/* Required annotation to trigger an alert with the advanced statement */
@RSAAlert
/* Instruct the engine remove duplicate matches to the first ‘a’ event */
SELECT * FROM pattern @SuppressOverlappingMatches
[/* Intrushield policy event */
every a=Event (
device_type IN ( 'intrushield' )
AND ip_src is not null
AND ip_dst is not null
AND policy_name is not null
AND policy_name NOT LIKE '%P2P%'
)
-> (/* The 10 minute time window following the first event */ timer:interval(600 seconds)
AND
/* 9 more Instrushield policy violations each with a unique policy_name and the same ip_src and ip_dst. */
[9] b= Event (
device_type IN ( 'intrushield' )
AND ip_src=a.ip_src
AND ip_dst = a.ip_dst
AND policy_name is not null
AND policy_name NOT LIKE '%P2P%'
AND policy_name != a.policy_name
)/* Both the statement for event b and event c must evaluate to true for the syntax to match. In other words, no TCP RST can occur to match the pattern. */
AND NOT
c=Event (medium=1 AND tcp_flags_seen ='rst' AND ip_dst=a.ip_dst)
)
]/* Ensure all b events for unique policy names between them. */
where b.distinctOf(i => i.policy_name).countOf() = 9;
Correlate events that arrive out of order
Statement Summary:
Correlate 3 events that populate the same ip_dst and occur within 30 minutes of each other, in any order.
For details on inner joins, see the following Esper Reference documentation:
- Esper 7.1: Inner Joins
- Esper 5.3: Inner Joins
- Column 1:
/*
Intrusion Detection with Nonstandard HTTPS Traffic and ECAT Alert Single host generates IPS alert on destination IP on port TCP/443 accompanied by traffic to TCP/443 that is not HTTPS with the target host generating an ECAT alert within 5 minutes.
*//* Create a window to store the IPS, nonstandard traffic and ECAT alerts */
@Name('create')
Create Window HttpsJoinedWindow.win:time(15 minutes)(device_class string, ip_dstport integer, service integer , tcp_dstport integer, device_type string, ip_dst string);/* Insert into the window the IPS, nonstandard traffic and ECAT alerts */
@Name('insert')
INSERT INTO HttpsJoinedWindow
SELECT * FROM
Event
(
(ip_dst IS NOT NULL and device_class IN ('IPS', 'IDS', 'Firewall') AND ip_dstport=443) OR
(ip_dst IS NOT NULL and service!=443 and tcp_dstport=443) OR
(ip_dst IS NOT NULL and device_type='rsaecat')
);
- Column 1:
/* Alert to the combination of all three events: IPS, nonstandard traffic and ECAT alerts */
@RSAAlert
INSERT INTO HttpsIntrusionTrigger
SELECT * FROM
HttpsJoinedWindow(ip_dst IS NOT NULL and device_class IN ('IPS', 'IDS', 'Firewall') AND ip_dstport=443) as s1,
HttpsJoinedWindow(ip_dst IS NOT NULL and service!=443 and tcp_dstport=443) as s2,
HttpsJoinedWindow(ip_dst IS NOT NULL and device_type='rsaecat') as s3 where s1.ip_dst = s2.ip_dst and s1.ip_dst = s3.ip_dst;/* Delete all events from the joined window that caused the alert so they won't be reused */
@Name('delete')
on HttpsIntrusionTrigger delete from HttpsJoinedWindow as j where s1.ip_dst=j.ip_dst;
Only fire rule within business hours
Statement Summary:
We want a rule to only fire if the event occurs within business hours.
For details on inner joins, see the following Esper Reference documentation:
- Esper 7.1: Non-Overlapping Context
- Esper 5.3: Non-Overlapping Context
Define non-working hours:
- Set the working hours as '09:00' – '18:00'
- Any event.cat.name LIKE system.config% after the working hours will trigger.
- Column 1:
create context NotWorkingHours start (0, 18, *, *, *) end (0, 9, *, *, *); context
NotWorkingHours select * from Event(event_cat_name LIKE ‘system.config%’);
Administrative Activity followed by User Creation
Solution: Added parenthesis around the second and third events to require both the 2nd and 3rd events to occur after the 1st event.
For details, see the following Esper Reference documentation:
- Esper 7.1: , so all 3 events would need to occur within that time window
- Added @SuppressOverlappingMatches. Use the @SuppressOverlappingMatches pattern-level annotation to instruct the engine to discard all but the first match among multiple, overlapping matches.
snapshot when terminated;, , , , , , , consult the Java API documentation or refer to http://www.regular-expressions.info/refflavors.html Regular Expression Flavors ., , , , , , , , , the system outputs everything that matches an ip_src., , rather than across all ip_src values that match the filter criteria., , you can view a rule's syntax, copy it to a new rule, and use this source to tune the rule for your needs., , , , , you can use Context Hub Lists in ESA Rules. Details are provided in the Configure Context Hub List as an Enrichment Source topic., , 3}\.[0-9]{1,3}\.[0-9]{1,3}”, , , 3} represents 3 digit number, , 3} represents 3 digit number where each digit starts from 0 to 9., 1,2,....,997,998,999, , 3}|172\.(3[01]|2[0-9]|1[6-9])|192\.168)\.[0-9]{1,3}\.[0-9]{1,3}, 3}\.[0-9]{1,3} specifies entire range of 000.000 to 999.999
), for example, if the username meta key contains the value Beta, the alert is triggered., , no matter the case., {'alpha','beta','gamma'})
), if the username meta key does not contain alpha, beta or gamma—no matter the case for any of those strings—the alert is triggered. So, if the username meta key contains Alpha or BETA or gAmmA, the alert is not triggered., , , , , , '%www.xn-%')
), , , , '.*www.xn-.*')
), , , , , you can use other Java-supported syntax, such as getIntersection(alias_host, e1.alias_host).contains("abc"), , , , , , e1.url).size() > 0), , , , , you should use contained event selection., policy2 string, policy_name string, email_str string);/*Insert into the window the policy names, email source string and whether the event matches to policy1 or policy 2*/@Name('Filter')ON Event (device_type = 'symantecdlp' ANDemail_src IS NOT NULL AND( EXISTS (SELECT * FROM policy1 WHERE (LIST = Event.policy_name)) OR EXISTS (SELECT * FROM policy2 WHERE (LIST = Event.policy_name)))) [select policy_name, email_str from asStringArray(email_src)@type(EmailContainer)] as eINSERT INTO PolicyViolations select policy_name, "policy1" as policy1, NULL as policy2, email_str where EXISTS (SELECT * FROM policy1 WHERE (LIST = e.policy_name))INSERT INTO PolicyViolations select policy_name if at least one event matches policy1 and 3 or more events match policy2 for the individual email_str, then an alert is generated., , , , see the following Esper Reference documentation:, , see Context Hub Lists in ESA Rules., , data window over a long time frame, it is better, in terms of memory usage, to use a named window instead., to use instead a named data window instead., you can conserve memory usage by using a named window instead of a data window. The following example creates a window and stores email_src from the event stream matching the filter criteria for 12 hours. The example uses syntax for contained event selection, and splits the multi-valued meta key email_src into individual strings for analysis within the named window., { 'postmaster@abc.com', 'mail delivery subsystem', 'donotreply@abc.com' }))) [asStringArray(email_src)@type(PersonalEmailContainer)] ;/*Alert when the email address is reported at least 10 times within a 12 hour period*/@RSAAlert@Name('Alert')INSERT INTO PersonalEmailAlertSELECT *FROM PersonalEmail as peGROUP BY email_strHAVING COUNT(*) >= 10OUTPUT FIRST EVERY 12 hours;/*Delete all events from the window that caused the alert so they won't be reused in subsequent alerts*/@Name('Delete')on PersonalEmailAlert as pe delete from PersonalEmail as pe where pe.email_str =pe.email_str;, , over ports 80, 443 or 8080, that are non-informational. After the events are filtered from the stream, they are stored in a data window for 15 minutes and aggregated by ip_src. When there is a count of 5 or more unique msg_ids across a single ip_src, then an alert is generated., 12-hour time period. The syntax for SELECT window(*) means to select all events within the data window:; otherwise, only the last event within the window will be returned., 'sourcefire') AND direction = 'lateral' AND msg_id IS NOT NULL AND severity != 'Informational' AND (ip_dstport in (80,443, 8080) OR ip_srcport in (80,443, 8080))).win:time_batch(15 minutes)GROUP BY ip_srcHAVING count(distinct msg_id) >= 5OUTPUT first every 12 hours;, , , , which means that depending on the amount of traffic, there could be a performance impact. If so, adding to a report or report alert may be a better option. The view for unique seems unnecessary, because only the first event per ip_src is returned over the 24-hour period., , , , , we have the following structures:, , mylist, and returning values from the list that match with values in alias_host. If the value does not exist (that is, it is not whitelisted), then the alert may be generated., {LIST}), , , , , , , , , ,